You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/10/02 13:09:19 UTC

[GitHub] [pinot] KKcorps opened a new pull request, #9511: Handle exception in realtime decoder gracefully

KKcorps opened a new pull request, #9511:
URL: https://github.com/apache/pinot/pull/9511

   This PR continues the effort from #9163 
   
   We already added `continueOnError` flag in previous PRs to handle errors gracefully. This PR extends the same configs to handle decoder exceptions as well.
   A new config flag has also been added called `skipPartialRecords` through which user can decided whether to consume these partial records or not after `continueOnError`.
   
   The PR also adds `$INCOMPETE_RECORDS_KEY$` as a schema column so that users can query which records need to be fixed in the table. The current implmentation is not ideal and should ideally be through VirtualColumnProvider. However, it would need some interface changes in virtual column classes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r1008688495


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -543,23 +543,32 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       // Decode message
       StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      GenericRow decoderResult;
       if (decodedRow.getException() != null) {

Review Comment:
   In transform pipeline, the errors are already handled gracefully so that is not a concern at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r985442978


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -51,6 +51,10 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to skip any row which has error and continue during ingestion")
   private boolean _continueOnError;
 
+  @JsonPropertyDescription("If set to true, the records with GenericRow.INCOMPLETE_RECORD_KEY will not be consumed."
+      + "This can be helpful if user only wants to see correct data in the table")

Review Comment:
   This applies to more cases then just decoder. e.g. if an error occurs during converting one of the columns data type, then if `continueOnError` is true, we simply replace it with default value. Hence it will be easier for users to filter such records with this flag.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -543,23 +543,32 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       // Decode message
       StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      GenericRow decoderResult;
       if (decodedRow.getException() != null) {

Review Comment:
   Makes sense. We can treat metadata vs data seperately. Are there any cases where metadata errors are critical?



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -261,6 +265,17 @@ public static Schema updateSchemaWithTimestampIndexes(Schema schema,
     return newSchema;
   }
 
+  public static Schema updateSchemaWithDerivedFlags(Schema schema) {

Review Comment:
   Don't like this method name. Suggestions welcome on renaming it (or if it is needed at all)



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -261,6 +265,17 @@ public static Schema updateSchemaWithTimestampIndexes(Schema schema,
     return newSchema;
   }
 
+  public static Schema updateSchemaWithDerivedFlags(Schema schema) {

Review Comment:
   I don't like this method's name. Suggestions welcome on renaming it (or if it is needed at all)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r990903032


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -51,6 +51,10 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to skip any row which has error and continue during ingestion")
   private boolean _continueOnError;
 
+  @JsonPropertyDescription("If set to true, the records with GenericRow.INCOMPLETE_RECORD_KEY will not be consumed."
+      + "This can be helpful if user only wants to see correct data in the table")

Review Comment:
   Based on internal discussions, this flag is removed from the schema. The approach looks ugly and until pinot support virtual columns with actual data we can't keep the flag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r988609896


##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -142,7 +144,9 @@ public SegmentGeneratorConfig(TableConfig tableConfig, Schema schema) {
     Preconditions.checkNotNull(schema);
     Preconditions.checkNotNull(tableConfig);
     _timestampIndexConfigs.putAll(extractTimestampIndexConfigsFromTableConfig(tableConfig));
-    setSchema(updateSchemaWithTimestampIndexes(schema, _timestampIndexConfigs));
+    Schema updatedSchema = updateSchemaWithTimestampIndexes(schema, _timestampIndexConfigs);

Review Comment:
   So based upon the discussions, I am going to remove this method and we are going to go with aggregate metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9511:
URL: https://github.com/apache/pinot/pull/9511#issuecomment-1272770369

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9511?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9511](https://codecov.io/gh/apache/pinot/pull/9511?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (89c64ca) into [master](https://codecov.io/gh/apache/pinot/commit/b026d321d335b0e452010bdd51ff9b09ac6f55d2?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b026d32) will **decrease** coverage by `34.77%`.
   > The diff coverage is `6.66%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9511       +/-   ##
   =============================================
   - Coverage     70.00%   35.23%   -34.78%     
   + Complexity     4792      189     -4603     
   =============================================
     Files          1921     1927        +6     
     Lines        102349   102763      +414     
     Branches      15530    15600       +70     
   =============================================
   - Hits          71651    36205    -35446     
   - Misses        25636    63490    +37854     
   + Partials       5062     3068     -1994     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `26.07% <6.66%> (+0.16%)` | :arrow_up: |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `15.58% <0.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../local/recordtransformer/CompositeTransformer.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9yZWNvcmR0cmFuc2Zvcm1lci9Db21wb3NpdGVUcmFuc2Zvcm1lci5qYXZh) | `0.00% <ø> (-83.34%)` | :arrow_down: |
   | [...cal/recordtransformer/SanitizationTransformer.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9yZWNvcmR0cmFuc2Zvcm1lci9TYW5pdGl6YXRpb25UcmFuc2Zvcm1lci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...gment/local/segment/creator/TransformPipeline.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2NyZWF0b3IvVHJhbnNmb3JtUGlwZWxpbmUuamF2YQ==) | `0.00% <0.00%> (-83.73%)` | :arrow_down: |
   | [...t/creator/impl/SegmentIndexCreationDriverImpl.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2NyZWF0b3IvaW1wbC9TZWdtZW50SW5kZXhDcmVhdGlvbkRyaXZlckltcGwuamF2YQ==) | `0.00% <0.00%> (-80.11%)` | :arrow_down: |
   | [...ache/pinot/segment/local/utils/IngestionUtils.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91dGlscy9Jbmdlc3Rpb25VdGlscy5qYXZh) | `0.00% <0.00%> (-28.08%)` | :arrow_down: |
   | [...ot/spi/config/table/ingestion/IngestionConfig.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL2luZ2VzdGlvbi9Jbmdlc3Rpb25Db25maWcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `55.45% <18.18%> (-14.66%)` | :arrow_down: |
   | [.../core/segment/processing/mapper/SegmentMapper.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvbWFwcGVyL1NlZ21lbnRNYXBwZXIuamF2YQ==) | `87.87% <25.00%> (-1.19%)` | :arrow_down: |
   | [...manager/realtime/HLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvSExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `74.19% <50.00%> (-8.68%)` | :arrow_down: |
   | [...in/java/org/apache/pinot/spi/utils/BytesUtils.java](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQnl0ZXNVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1158 more](https://codecov.io/gh/apache/pinot/pull/9511/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r988628937


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java:
##########
@@ -51,24 +63,35 @@ public SanitizationTransformer(Schema schema) {
   @Override
   public GenericRow transform(GenericRow record) {
     for (Map.Entry<String, Integer> entry : _stringColumnMaxLengthMap.entrySet()) {
-      String stringColumn = entry.getKey();
-      int maxLength = entry.getValue();
-      Object value = record.getValue(stringColumn);
-      if (value instanceof String) {
-        // Single-valued column
-        String stringValue = (String) value;
-        String sanitizedValue = StringUtil.sanitizeStringValue(stringValue, maxLength);
-        // NOTE: reference comparison
-        //noinspection StringEquality
-        if (sanitizedValue != stringValue) {
-          record.putValue(stringColumn, sanitizedValue);
+      try {
+        String stringColumn = entry.getKey();
+        int maxLength = entry.getValue();
+        Object value = record.getValue(stringColumn);
+        if (value instanceof String) {
+          // Single-valued column
+          String stringValue = (String) value;
+          String sanitizedValue = StringUtil.sanitizeStringValue(stringValue, maxLength);
+          // NOTE: reference comparison
+          //noinspection StringEquality
+          if (sanitizedValue != stringValue) {
+            record.putValue(stringColumn, sanitizedValue);
+          }
+        } else {
+          // Multi-valued column
+          Object[] values = (Object[]) value;
+          int numValues = values.length;
+          for (int i = 0; i < numValues; i++) {
+            values[i] = StringUtil.sanitizeStringValue(values[i].toString(), maxLength);
+          }
         }
-      } else {
-        // Multi-valued column
-        Object[] values = (Object[]) value;
-        int numValues = values.length;
-        for (int i = 0; i < numValues; i++) {
-          values[i] = StringUtil.sanitizeStringValue(values[i].toString(), maxLength);
+      } catch (Exception e) {
+        if (!_continueOnError) {
+          throw new RuntimeException("Caught exception while sanitizing value for column: " + entry.getKey(), e);
+        } else {
+          LOGGER.debug("Caught exception while sanitizing value for column: {}", entry.getKey(), e);
+          //TODO: should put NULL here instead of string `null` and use NullValueTransformer for appropriate value
+          record.putValue(entry.getKey(), "null");
+          record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);

Review Comment:
   Yes. We were using it to increment the metric `ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED` as well as log using `TransformPipeline.Result.getIncompleteRowCount()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r1008689100


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -545,23 +545,31 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       // Decode message
       StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      GenericRow decoderResult;
       if (decodedRow.getException() != null) {
-        // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
-        // decode error
-        realtimeRowsDroppedMeter =
-            _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
-                realtimeRowsDroppedMeter);
+        if (_tableConfig.getIngestionConfig() != null
+            && _tableConfig.getIngestionConfig().isContinueOnError()) {
+          decoderResult = null;
+          realtimeRowsDroppedMeter =
+              _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                  realtimeRowsDroppedMeter);
+        } else {
+          throw new RuntimeException("Caught exception while decoding record", decodedRow.getException());

Review Comment:
   @navina Let me know what I should do here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r1009801177


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -545,23 +545,31 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       // Decode message
       StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      GenericRow decoderResult;
       if (decodedRow.getException() != null) {
-        // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
-        // decode error
-        realtimeRowsDroppedMeter =
-            _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
-                realtimeRowsDroppedMeter);
+        if (_tableConfig.getIngestionConfig() != null
+            && _tableConfig.getIngestionConfig().isContinueOnError()) {
+          decoderResult = null;
+          realtimeRowsDroppedMeter =
+              _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                  realtimeRowsDroppedMeter);
+        } else {
+          throw new RuntimeException("Caught exception while decoding record", decodedRow.getException());

Review Comment:
   Ideally, we would want users to "opt-in" for `continueOnError`. So, I think it should be fine to make this incompatible change. The default behavior should be to stop the ingestion on error. 
   
   @npawar any concerns with this backward incompatible change? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r990903032


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -51,6 +51,10 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to skip any row which has error and continue during ingestion")
   private boolean _continueOnError;
 
+  @JsonPropertyDescription("If set to true, the records with GenericRow.INCOMPLETE_RECORD_KEY will not be consumed."
+      + "This can be helpful if user only wants to see correct data in the table")

Review Comment:
   Based on internal discussions, this flag is removed from the schema. The approach looks ugly and until pinot support virtual columns with actual data we can keep it as same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r1008688878


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -545,23 +545,31 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       // Decode message
       StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      GenericRow decoderResult;
       if (decodedRow.getException() != null) {
-        // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
-        // decode error
-        realtimeRowsDroppedMeter =
-            _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
-                realtimeRowsDroppedMeter);
+        if (_tableConfig.getIngestionConfig() != null
+            && _tableConfig.getIngestionConfig().isContinueOnError()) {
+          decoderResult = null;
+          realtimeRowsDroppedMeter =
+              _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                  realtimeRowsDroppedMeter);
+        } else {
+          throw new RuntimeException("Caught exception while decoding record", decodedRow.getException());

Review Comment:
   Marked PR as `backward-incompat` because of this line. I think some change was done in 0.10 release because of which we were simply returning `null` here and not throwing exception. However, this PR addresses a `TODO` present in that code. 
   ```java
    // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
           // decode error
    ```
    After this change if `continueOnError` is not set in table config, we will simply stop the consumption with an error. This can break old pipelines which have been silently ignoring errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r985365055


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -543,23 +543,32 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       // Decode message
       StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      GenericRow decoderResult;
       if (decodedRow.getException() != null) {

Review Comment:
   qq: Currently, stream data decoder decodes key, value, headers and metadata (See `StreamDataDecoderImpl`). Should Pinot treat failure to decode each of these parts similarly? 
   Previously, Pinot's decode failure would only pertain to the value decoding errors. If the user is not interested in the header/metadata/key fields, I wonder if there will be a case where this partially decoded result is still needed and it should not be considered as a failure. What are your thoughts on that?
   
   I think it might be cleaner to wrap this `continueOnError` logic within `StreamDataDecoderImpl` ? Or extend that class and handle? 
   
   
   
    



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -51,6 +51,10 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to skip any row which has error and continue during ingestion")
   private boolean _continueOnError;
 
+  @JsonPropertyDescription("If set to true, the records with GenericRow.INCOMPLETE_RECORD_KEY will not be consumed."
+      + "This can be helpful if user only wants to see correct data in the table")

Review Comment:
   Correct me if I am mistaken: we are not really storing the partially decoded record. When decode failure happens, we only store an empty `GenericRow` with the field `INCOMPLETE_RECORD_KEY`. So, calling this `skipPartialRecords` is confusing. Something like `DECODE_FAILED_KEY` seems more appropriate. 
   
   Can you clarify the description -  "If set to true, the records with GenericRow.INCOMPLETE_RECORD_KEY will not be **consumed**." -> consumed for query ? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r985551184


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -543,23 +543,32 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       // Decode message
       StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      GenericRow decoderResult;
       if (decodedRow.getException() != null) {

Review Comment:
   I think header issues might be more critical as metadata is usually populated by the stream plugin itself and should be standardized for each.
   I took another look at the code and realized that we just add the header object into the GenericRow. There is no actual decoding happening here. Not sure how this will be transformed in the transform pipeline. 
   ```
           RowMetadata metadata = message.getMetadata();
           if (metadata != null) {
             metadata.getHeaders().getFieldToValueMap()
                 .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value));
   
             metadata.getRecordMetadata()
                     .forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value));
           }
   ``` 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9511: Handle exception in realtime decoder gracefully

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9511:
URL: https://github.com/apache/pinot/pull/9511#discussion_r1008688878


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -545,23 +545,31 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       // Decode message
       StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      GenericRow decoderResult;
       if (decodedRow.getException() != null) {
-        // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
-        // decode error
-        realtimeRowsDroppedMeter =
-            _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
-                realtimeRowsDroppedMeter);
+        if (_tableConfig.getIngestionConfig() != null
+            && _tableConfig.getIngestionConfig().isContinueOnError()) {
+          decoderResult = null;
+          realtimeRowsDroppedMeter =
+              _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                  realtimeRowsDroppedMeter);
+        } else {
+          throw new RuntimeException("Caught exception while decoding record", decodedRow.getException());

Review Comment:
   Marked PR as `backward-incompat` because of this line. I think some change was done in 0.10 release because of which we were simply returning `null` here and not throwing exception. However, this PR addresses a `TODO` present in that code
   ```java
    // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
           // decode error
    ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org