You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/12/18 00:26:51 UTC

[GitHub] [pinot] Jackie-Jiang opened a new pull request #7926: Separate the exception for transform and indexing for consuming records

Jackie-Jiang opened a new pull request #7926:
URL: https://github.com/apache/pinot/pull/7926


   Currently the exception from transforming and indexing the records are mixed together, which is hard for debugging. This PR splits them into different exception messages.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #7926: Separate the exception for transform and indexing for consuming records

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged pull request #7926:
URL: https://github.com/apache/pinot/pull/7926


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7926: Separate the exception for transform and indexing for consuming records

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#issuecomment-997116495


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7926](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1dcd461) into [master](https://codecov.io/gh/apache/pinot/commit/6037cac427c310c298d78b5fbcd11b6757982f73?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6037cac) will **decrease** coverage by `0.09%`.
   > The diff coverage is `33.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7926/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7926      +/-   ##
   ============================================
   - Coverage     71.15%   71.06%   -0.10%     
   - Complexity     4111     4114       +3     
   ============================================
     Files          1593     1593              
     Lines         82365    82375      +10     
     Branches      12270    12271       +1     
   ============================================
   - Hits          58609    58536      -73     
   - Misses        19806    19892      +86     
   + Partials       3950     3947       -3     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.04% <33.33%> (-0.07%)` | :arrow_down: |
   | integration2 | `27.63% <33.33%> (+0.04%)` | :arrow_up: |
   | unittests1 | `68.05% <0.00%> (-0.02%)` | :arrow_down: |
   | unittests2 | `14.33% <0.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ocal/recordtransformer/ComplexTypeTransformer.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9yZWNvcmR0cmFuc2Zvcm1lci9Db21wbGV4VHlwZVRyYW5zZm9ybWVyLmphdmE=) | `55.67% <ø> (ø)` | |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `71.81% <33.33%> (-0.03%)` | :arrow_down: |
   | [...data/manager/realtime/DefaultSegmentCommitter.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvRGVmYXVsdFNlZ21lbnRDb21taXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-80.00%)` | :arrow_down: |
   | [...ller/helix/core/minion/TaskTypeMetricsUpdater.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9UYXNrVHlwZU1ldHJpY3NVcGRhdGVyLmphdmE=) | `80.00% <0.00%> (-20.00%)` | :arrow_down: |
   | [...er/api/resources/LLCSegmentCompletionHandlers.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL0xMQ1NlZ21lbnRDb21wbGV0aW9uSGFuZGxlcnMuamF2YQ==) | `43.56% <0.00%> (-18.82%)` | :arrow_down: |
   | [...data/manager/realtime/SegmentCommitterFactory.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvU2VnbWVudENvbW1pdHRlckZhY3RvcnkuamF2YQ==) | `88.23% <0.00%> (-11.77%)` | :arrow_down: |
   | [...altime/ServerSegmentCompletionProtocolHandler.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VydmVyL3JlYWx0aW1lL1NlcnZlclNlZ21lbnRDb21wbGV0aW9uUHJvdG9jb2xIYW5kbGVyLmphdmE=) | `51.42% <0.00%> (-6.67%)` | :arrow_down: |
   | [.../helix/core/minion/MinionInstancesCleanupTask.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9NaW5pb25JbnN0YW5jZXNDbGVhbnVwVGFzay5qYXZh) | `77.27% <0.00%> (-4.55%)` | :arrow_down: |
   | [...core/query/executor/ServerQueryExecutorV1Impl.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9leGVjdXRvci9TZXJ2ZXJRdWVyeUV4ZWN1dG9yVjFJbXBsLmphdmE=) | `83.33% <0.00%> (-4.42%)` | :arrow_down: |
   | [...pinot/core/query/request/context/TimerContext.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9yZXF1ZXN0L2NvbnRleHQvVGltZXJDb250ZXh0LmphdmE=) | `91.66% <0.00%> (-4.17%)` | :arrow_down: |
   | ... and [23 more](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6037cac...1dcd461](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a change in pull request #7926: Separate the exception for transform and indexing for consuming records

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#discussion_r771892778



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -494,50 +494,59 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
           .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
       if (decodedRow != null) {
+        List<GenericRow> transformedRows = new ArrayList<>();
         try {
           if (_complexTypeTransformer != null) {
             // TODO: consolidate complex type transformer into composite type transformer
             decodedRow = _complexTypeTransformer.transform(decodedRow);
           }
-          if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
-            for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
-              GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
-              if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
-                realtimeRowsConsumedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
-                        realtimeRowsConsumedMeter);
-                indexedMessageCount++;
-                canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+          Collection<GenericRow> rows = (Collection<GenericRow>) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+          if (rows != null) {
+            for (GenericRow row : rows) {
+              GenericRow transformedRow = _recordTransformer.transform(row);
+              if (transformedRow != null && IngestionUtils.shouldIngestRow(row)) {
+                transformedRows.add(transformedRow);
               } else {
-                realtimeRowsDroppedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                realtimeRowsDroppedMeter =
+                    _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
                         realtimeRowsDroppedMeter);
               }
             }
           } else {
             GenericRow transformedRow = _recordTransformer.transform(decodedRow);
             if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
-              realtimeRowsConsumedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
-                      realtimeRowsConsumedMeter);
-              indexedMessageCount++;
-              canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+              transformedRows.add(transformedRow);
             } else {
-              realtimeRowsDroppedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+              realtimeRowsDroppedMeter =
+                  _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
                       realtimeRowsDroppedMeter);
             }
           }
         } catch (Exception e) {
+          _numRowsErrored++;
           String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
           _segmentLogger.error(errorMessage, e);
-          _numRowsErrored++;
-          _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+          _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        }
+        for (GenericRow transformedRow : transformedRows) {
+          try {
+            canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+            indexedMessageCount++;
+            realtimeRowsConsumedMeter =
+                _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,

Review comment:
       Ah I see now. In the original code, we bump metrics independently. if multiple transformed rows, then once for each transformed row, otherwise once for consumed row.  Your new code will preserve the same behavior yes. thanks for the clarification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a change in pull request #7926: Separate the exception for transform and indexing for consuming records

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#discussion_r771755746



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -494,50 +494,59 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
           .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
       if (decodedRow != null) {
+        List<GenericRow> transformedRows = new ArrayList<>();
         try {
           if (_complexTypeTransformer != null) {
             // TODO: consolidate complex type transformer into composite type transformer
             decodedRow = _complexTypeTransformer.transform(decodedRow);
           }
-          if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
-            for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
-              GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
-              if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
-                realtimeRowsConsumedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
-                        realtimeRowsConsumedMeter);
-                indexedMessageCount++;
-                canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+          Collection<GenericRow> rows = (Collection<GenericRow>) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+          if (rows != null) {
+            for (GenericRow row : rows) {
+              GenericRow transformedRow = _recordTransformer.transform(row);
+              if (transformedRow != null && IngestionUtils.shouldIngestRow(row)) {
+                transformedRows.add(transformedRow);
               } else {
-                realtimeRowsDroppedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                realtimeRowsDroppedMeter =
+                    _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
                         realtimeRowsDroppedMeter);
               }
             }
           } else {
             GenericRow transformedRow = _recordTransformer.transform(decodedRow);
             if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
-              realtimeRowsConsumedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
-                      realtimeRowsConsumedMeter);
-              indexedMessageCount++;
-              canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+              transformedRows.add(transformedRow);
             } else {
-              realtimeRowsDroppedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+              realtimeRowsDroppedMeter =
+                  _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
                       realtimeRowsDroppedMeter);
             }
           }
         } catch (Exception e) {
+          _numRowsErrored++;
           String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
           _segmentLogger.error(errorMessage, e);
-          _numRowsErrored++;
-          _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+          _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        }
+        for (GenericRow transformedRow : transformedRows) {
+          try {
+            canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+            indexedMessageCount++;
+            realtimeRowsConsumedMeter =
+                _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,

Review comment:
       One consumed row can transform into multiple transformed rows, so we should bump this metric after we have consumed a row (and before we transform it) -- whether there are exceptions or not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #7926: Separate the exception for transform and indexing for consuming records

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#issuecomment-997116495


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7926](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1dcd461) into [master](https://codecov.io/gh/apache/pinot/commit/6037cac427c310c298d78b5fbcd11b6757982f73?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6037cac) will **decrease** coverage by `0.92%`.
   > The diff coverage is `33.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7926/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7926      +/-   ##
   ============================================
   - Coverage     71.15%   70.22%   -0.93%     
   - Complexity     4111     4114       +3     
   ============================================
     Files          1593     1593              
     Lines         82365    82375      +10     
     Branches      12270    12271       +1     
   ============================================
   - Hits          58609    57851     -758     
   - Misses        19806    20573     +767     
   - Partials       3950     3951       +1     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.04% <33.33%> (-0.07%)` | :arrow_down: |
   | integration2 | `?` | |
   | unittests1 | `68.05% <0.00%> (-0.02%)` | :arrow_down: |
   | unittests2 | `14.33% <0.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ocal/recordtransformer/ComplexTypeTransformer.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9yZWNvcmR0cmFuc2Zvcm1lci9Db21wbGV4VHlwZVRyYW5zZm9ybWVyLmphdmE=) | `55.67% <ø> (ø)` | |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `70.46% <33.33%> (-1.37%)` | :arrow_down: |
   | [...t/core/plan/StreamingInstanceResponsePlanNode.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9wbGFuL1N0cmVhbWluZ0luc3RhbmNlUmVzcG9uc2VQbGFuTm9kZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/operator/streaming/StreamingResponseUtils.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9zdHJlYW1pbmcvU3RyZWFtaW5nUmVzcG9uc2VVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ager/realtime/PeerSchemeSplitSegmentCommitter.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUGVlclNjaGVtZVNwbGl0U2VnbWVudENvbW1pdHRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/pinot/common/utils/grpc/GrpcQueryClient.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvZ3JwYy9HcnBjUXVlcnlDbGllbnQuamF2YQ==) | `0.00% <0.00%> (-94.74%)` | :arrow_down: |
   | [...he/pinot/core/plan/StreamingSelectionPlanNode.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9wbGFuL1N0cmVhbWluZ1NlbGVjdGlvblBsYW5Ob2RlLmphdmE=) | `0.00% <0.00%> (-88.89%)` | :arrow_down: |
   | [...ator/streaming/StreamingSelectionOnlyOperator.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9zdHJlYW1pbmcvU3RyZWFtaW5nU2VsZWN0aW9uT25seU9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-87.81%)` | :arrow_down: |
   | [...data/manager/realtime/DefaultSegmentCommitter.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvRGVmYXVsdFNlZ21lbnRDb21taXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-80.00%)` | :arrow_down: |
   | [...ller/api/access/BasicAuthAccessControlFactory.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvYWNjZXNzL0Jhc2ljQXV0aEFjY2Vzc0NvbnRyb2xGYWN0b3J5LmphdmE=) | `0.00% <0.00%> (-80.00%)` | :arrow_down: |
   | ... and [93 more](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6037cac...1dcd461](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7926: Separate the exception for transform and indexing for consuming records

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#discussion_r771771849



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -494,50 +494,59 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
           .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
       if (decodedRow != null) {
+        List<GenericRow> transformedRows = new ArrayList<>();
         try {
           if (_complexTypeTransformer != null) {
             // TODO: consolidate complex type transformer into composite type transformer
             decodedRow = _complexTypeTransformer.transform(decodedRow);
           }
-          if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
-            for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
-              GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
-              if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
-                realtimeRowsConsumedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
-                        realtimeRowsConsumedMeter);
-                indexedMessageCount++;
-                canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+          Collection<GenericRow> rows = (Collection<GenericRow>) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+          if (rows != null) {
+            for (GenericRow row : rows) {
+              GenericRow transformedRow = _recordTransformer.transform(row);
+              if (transformedRow != null && IngestionUtils.shouldIngestRow(row)) {
+                transformedRows.add(transformedRow);
               } else {
-                realtimeRowsDroppedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                realtimeRowsDroppedMeter =
+                    _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
                         realtimeRowsDroppedMeter);
               }
             }
           } else {
             GenericRow transformedRow = _recordTransformer.transform(decodedRow);
             if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
-              realtimeRowsConsumedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
-                      realtimeRowsConsumedMeter);
-              indexedMessageCount++;
-              canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+              transformedRows.add(transformedRow);
             } else {
-              realtimeRowsDroppedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+              realtimeRowsDroppedMeter =
+                  _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
                       realtimeRowsDroppedMeter);
             }
           }
         } catch (Exception e) {
+          _numRowsErrored++;
           String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
           _segmentLogger.error(errorMessage, e);
-          _numRowsErrored++;
-          _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+          _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        }
+        for (GenericRow transformedRow : transformedRows) {
+          try {
+            canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+            indexedMessageCount++;
+            realtimeRowsConsumedMeter =
+                _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,

Review comment:
       This keeps the existing behavior where each transformed row bumps this metric ones. I feel we should keep the existing behavior




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org