You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/12/18 00:26:51 UTC
[GitHub] [pinot] Jackie-Jiang opened a new pull request #7926: Separate the exception for transform and indexing for consuming records
Jackie-Jiang opened a new pull request #7926:
URL: https://github.com/apache/pinot/pull/7926
Currently the exception from transforming and indexing the records are mixed together, which is hard for debugging. This PR splits them into different exception messages.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [pinot] Jackie-Jiang merged pull request #7926: Separate the exception for transform and indexing for consuming records
Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged pull request #7926:
URL: https://github.com/apache/pinot/pull/7926
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [pinot] codecov-commenter edited a comment on pull request #7926: Separate the exception for transform and indexing for consuming records
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#issuecomment-997116495
# [Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#7926](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1dcd461) into [master](https://codecov.io/gh/apache/pinot/commit/6037cac427c310c298d78b5fbcd11b6757982f73?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6037cac) will **decrease** coverage by `0.09%`.
> The diff coverage is `33.33%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7926/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #7926 +/- ##
============================================
- Coverage 71.15% 71.06% -0.10%
- Complexity 4111 4114 +3
============================================
Files 1593 1593
Lines 82365 82375 +10
Branches 12270 12271 +1
============================================
- Hits 58609 58536 -73
- Misses 19806 19892 +86
+ Partials 3950 3947 -3
```
| Flag | Coverage Δ | |
|---|---|---|
| integration1 | `29.04% <33.33%> (-0.07%)` | :arrow_down: |
| integration2 | `27.63% <33.33%> (+0.04%)` | :arrow_up: |
| unittests1 | `68.05% <0.00%> (-0.02%)` | :arrow_down: |
| unittests2 | `14.33% <0.00%> (-0.01%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...ocal/recordtransformer/ComplexTypeTransformer.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9yZWNvcmR0cmFuc2Zvcm1lci9Db21wbGV4VHlwZVRyYW5zZm9ybWVyLmphdmE=) | `55.67% <ø> (ø)` | |
| [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `71.81% <33.33%> (-0.03%)` | :arrow_down: |
| [...data/manager/realtime/DefaultSegmentCommitter.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvRGVmYXVsdFNlZ21lbnRDb21taXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-80.00%)` | :arrow_down: |
| [...ller/helix/core/minion/TaskTypeMetricsUpdater.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9UYXNrVHlwZU1ldHJpY3NVcGRhdGVyLmphdmE=) | `80.00% <0.00%> (-20.00%)` | :arrow_down: |
| [...er/api/resources/LLCSegmentCompletionHandlers.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL0xMQ1NlZ21lbnRDb21wbGV0aW9uSGFuZGxlcnMuamF2YQ==) | `43.56% <0.00%> (-18.82%)` | :arrow_down: |
| [...data/manager/realtime/SegmentCommitterFactory.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvU2VnbWVudENvbW1pdHRlckZhY3RvcnkuamF2YQ==) | `88.23% <0.00%> (-11.77%)` | :arrow_down: |
| [...altime/ServerSegmentCompletionProtocolHandler.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VydmVyL3JlYWx0aW1lL1NlcnZlclNlZ21lbnRDb21wbGV0aW9uUHJvdG9jb2xIYW5kbGVyLmphdmE=) | `51.42% <0.00%> (-6.67%)` | :arrow_down: |
| [.../helix/core/minion/MinionInstancesCleanupTask.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9NaW5pb25JbnN0YW5jZXNDbGVhbnVwVGFzay5qYXZh) | `77.27% <0.00%> (-4.55%)` | :arrow_down: |
| [...core/query/executor/ServerQueryExecutorV1Impl.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9leGVjdXRvci9TZXJ2ZXJRdWVyeUV4ZWN1dG9yVjFJbXBsLmphdmE=) | `83.33% <0.00%> (-4.42%)` | :arrow_down: |
| [...pinot/core/query/request/context/TimerContext.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9yZXF1ZXN0L2NvbnRleHQvVGltZXJDb250ZXh0LmphdmE=) | `91.66% <0.00%> (-4.17%)` | :arrow_down: |
| ... and [23 more](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6037cac...1dcd461](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [pinot] mcvsubbu commented on a change in pull request #7926: Separate the exception for transform and indexing for consuming records
Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#discussion_r771892778
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -494,50 +494,59 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
.decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
if (decodedRow != null) {
+ List<GenericRow> transformedRows = new ArrayList<>();
try {
if (_complexTypeTransformer != null) {
// TODO: consolidate complex type transformer into composite type transformer
decodedRow = _complexTypeTransformer.transform(decodedRow);
}
- if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
- for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
- GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
- if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
- realtimeRowsConsumedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
- realtimeRowsConsumedMeter);
- indexedMessageCount++;
- canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ Collection<GenericRow> rows = (Collection<GenericRow>) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ if (rows != null) {
+ for (GenericRow row : rows) {
+ GenericRow transformedRow = _recordTransformer.transform(row);
+ if (transformedRow != null && IngestionUtils.shouldIngestRow(row)) {
+ transformedRows.add(transformedRow);
} else {
- realtimeRowsDroppedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ realtimeRowsDroppedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
}
}
} else {
GenericRow transformedRow = _recordTransformer.transform(decodedRow);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
- realtimeRowsConsumedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
- realtimeRowsConsumedMeter);
- indexedMessageCount++;
- canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ transformedRows.add(transformedRow);
} else {
- realtimeRowsDroppedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ realtimeRowsDroppedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
}
}
} catch (Exception e) {
+ _numRowsErrored++;
String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
_segmentLogger.error(errorMessage, e);
- _numRowsErrored++;
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ }
+ for (GenericRow transformedRow : transformedRows) {
+ try {
+ canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ indexedMessageCount++;
+ realtimeRowsConsumedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
Review comment:
Ah I see now. In the original code, we bump metrics independently. if multiple transformed rows, then once for each transformed row, otherwise once for consumed row. Your new code will preserve the same behavior yes. thanks for the clarification.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [pinot] mcvsubbu commented on a change in pull request #7926: Separate the exception for transform and indexing for consuming records
Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#discussion_r771755746
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -494,50 +494,59 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
.decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
if (decodedRow != null) {
+ List<GenericRow> transformedRows = new ArrayList<>();
try {
if (_complexTypeTransformer != null) {
// TODO: consolidate complex type transformer into composite type transformer
decodedRow = _complexTypeTransformer.transform(decodedRow);
}
- if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
- for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
- GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
- if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
- realtimeRowsConsumedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
- realtimeRowsConsumedMeter);
- indexedMessageCount++;
- canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ Collection<GenericRow> rows = (Collection<GenericRow>) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ if (rows != null) {
+ for (GenericRow row : rows) {
+ GenericRow transformedRow = _recordTransformer.transform(row);
+ if (transformedRow != null && IngestionUtils.shouldIngestRow(row)) {
+ transformedRows.add(transformedRow);
} else {
- realtimeRowsDroppedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ realtimeRowsDroppedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
}
}
} else {
GenericRow transformedRow = _recordTransformer.transform(decodedRow);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
- realtimeRowsConsumedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
- realtimeRowsConsumedMeter);
- indexedMessageCount++;
- canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ transformedRows.add(transformedRow);
} else {
- realtimeRowsDroppedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ realtimeRowsDroppedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
}
}
} catch (Exception e) {
+ _numRowsErrored++;
String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
_segmentLogger.error(errorMessage, e);
- _numRowsErrored++;
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ }
+ for (GenericRow transformedRow : transformedRows) {
+ try {
+ canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ indexedMessageCount++;
+ realtimeRowsConsumedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
Review comment:
One consumed row can transform into multiple transformed rows, so we should bump this metric after we have consumed a row (and before we transform it) -- whether there are exceptions or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [pinot] codecov-commenter commented on pull request #7926: Separate the exception for transform and indexing for consuming records
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#issuecomment-997116495
# [Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#7926](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1dcd461) into [master](https://codecov.io/gh/apache/pinot/commit/6037cac427c310c298d78b5fbcd11b6757982f73?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6037cac) will **decrease** coverage by `0.92%`.
> The diff coverage is `33.33%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7926/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #7926 +/- ##
============================================
- Coverage 71.15% 70.22% -0.93%
- Complexity 4111 4114 +3
============================================
Files 1593 1593
Lines 82365 82375 +10
Branches 12270 12271 +1
============================================
- Hits 58609 57851 -758
- Misses 19806 20573 +767
- Partials 3950 3951 +1
```
| Flag | Coverage Δ | |
|---|---|---|
| integration1 | `29.04% <33.33%> (-0.07%)` | :arrow_down: |
| integration2 | `?` | |
| unittests1 | `68.05% <0.00%> (-0.02%)` | :arrow_down: |
| unittests2 | `14.33% <0.00%> (-0.01%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...ocal/recordtransformer/ComplexTypeTransformer.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9yZWNvcmR0cmFuc2Zvcm1lci9Db21wbGV4VHlwZVRyYW5zZm9ybWVyLmphdmE=) | `55.67% <ø> (ø)` | |
| [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `70.46% <33.33%> (-1.37%)` | :arrow_down: |
| [...t/core/plan/StreamingInstanceResponsePlanNode.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9wbGFuL1N0cmVhbWluZ0luc3RhbmNlUmVzcG9uc2VQbGFuTm9kZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...ore/operator/streaming/StreamingResponseUtils.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9zdHJlYW1pbmcvU3RyZWFtaW5nUmVzcG9uc2VVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...ager/realtime/PeerSchemeSplitSegmentCommitter.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUGVlclNjaGVtZVNwbGl0U2VnbWVudENvbW1pdHRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...pache/pinot/common/utils/grpc/GrpcQueryClient.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvZ3JwYy9HcnBjUXVlcnlDbGllbnQuamF2YQ==) | `0.00% <0.00%> (-94.74%)` | :arrow_down: |
| [...he/pinot/core/plan/StreamingSelectionPlanNode.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9wbGFuL1N0cmVhbWluZ1NlbGVjdGlvblBsYW5Ob2RlLmphdmE=) | `0.00% <0.00%> (-88.89%)` | :arrow_down: |
| [...ator/streaming/StreamingSelectionOnlyOperator.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9zdHJlYW1pbmcvU3RyZWFtaW5nU2VsZWN0aW9uT25seU9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-87.81%)` | :arrow_down: |
| [...data/manager/realtime/DefaultSegmentCommitter.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvRGVmYXVsdFNlZ21lbnRDb21taXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-80.00%)` | :arrow_down: |
| [...ller/api/access/BasicAuthAccessControlFactory.java](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvYWNjZXNzL0Jhc2ljQXV0aEFjY2Vzc0NvbnRyb2xGYWN0b3J5LmphdmE=) | `0.00% <0.00%> (-80.00%)` | :arrow_down: |
| ... and [93 more](https://codecov.io/gh/apache/pinot/pull/7926/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6037cac...1dcd461](https://codecov.io/gh/apache/pinot/pull/7926?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7926: Separate the exception for transform and indexing for consuming records
Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7926:
URL: https://github.com/apache/pinot/pull/7926#discussion_r771771849
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -494,50 +494,59 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
.decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
if (decodedRow != null) {
+ List<GenericRow> transformedRows = new ArrayList<>();
try {
if (_complexTypeTransformer != null) {
// TODO: consolidate complex type transformer into composite type transformer
decodedRow = _complexTypeTransformer.transform(decodedRow);
}
- if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
- for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
- GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
- if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
- realtimeRowsConsumedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
- realtimeRowsConsumedMeter);
- indexedMessageCount++;
- canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ Collection<GenericRow> rows = (Collection<GenericRow>) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ if (rows != null) {
+ for (GenericRow row : rows) {
+ GenericRow transformedRow = _recordTransformer.transform(row);
+ if (transformedRow != null && IngestionUtils.shouldIngestRow(row)) {
+ transformedRows.add(transformedRow);
} else {
- realtimeRowsDroppedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ realtimeRowsDroppedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
}
}
} else {
GenericRow transformedRow = _recordTransformer.transform(decodedRow);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
- realtimeRowsConsumedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
- realtimeRowsConsumedMeter);
- indexedMessageCount++;
- canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ transformedRows.add(transformedRow);
} else {
- realtimeRowsDroppedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ realtimeRowsDroppedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
}
}
} catch (Exception e) {
+ _numRowsErrored++;
String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
_segmentLogger.error(errorMessage, e);
- _numRowsErrored++;
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ }
+ for (GenericRow transformedRow : transformedRows) {
+ try {
+ canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ indexedMessageCount++;
+ realtimeRowsConsumedMeter =
+ _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
Review comment:
This keeps the existing behavior where each transformed row bumps this metric ones. I feel we should keep the existing behavior
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org