You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2023/01/20 01:41:24 UTC

[GitHub] [pinot] sajjad-moradi commented on a diff in pull request #10121: Add a tracker for end-to-end consumption delay of events.

sajjad-moradi commented on code in PR #10121:
URL: https://github.com/apache/pinot/pull/10121#discussion_r1082029067


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -76,6 +76,15 @@
 
 public class IngestionDelayTracker {
 
+  // Class to wrap supported timestamps collected for an ingested event
+  private static class IngestionTimestamps {
+    IngestionTimestamps(long ingestionTimesMs, long creationTimeMs) {
+      _ingestionTimeMs = ingestionTimesMs;
+      _creationTimeMs = creationTimeMs;

Review Comment:
   `_ingestionTimeMs` is basically the time when a message was published to the last stream.
   `_creationTimeMs` is the time when a message was published to the first stream.
   
   Should we rename them to something like `lastStreamPublishTimeMs` and `firstStreamPublishTimeMs`? Or at least we need to have some comments to clearly specify what each variable is.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -615,7 +615,8 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
       }
     } else if (!prematureExit) {
       // Record Pinot ingestion delay as zero since we are up-to-date and no new events
-      _realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(), _partitionGroupId);
+      _realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(), System.currentTimeMillis(),

Review Comment:
   Refactor to `long now = System.currentTimeMillis(); ...`



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java:
##########
@@ -85,29 +85,37 @@ public void testRecordIngestionDelayWithNoAging() {
 
     // Test we follow a single partition up and down
     for (long i = 0; i <= maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, partition0);
+      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
       Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i);
+      Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+          clock.millis() - (i + 1));

Review Comment:
   Ditto. Please remove all other redundant parentheses.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java:
##########
@@ -49,6 +49,17 @@ public interface RowMetadata {
    */
   long getRecordIngestionTimeMs();
 
+  /**
+   * Returns the creation timestamp associated with the record. In cases where the upstream ingestion pipeline is
+   * simple this timestamp matches the result of getRecordIngestionTimeMs();
+   *
+   * Expected to be used for stream-based sources.
+   *
+   * @return timestamp (epoch in milliseconds) when the row was initially created and ingested upstream for the first
+   *         time Long.MIN_VALUE if not available
+   */
+  long getRecordCreationTimeMs();

Review Comment:
   Shouldn't we return the default value Long.MIN_VALUE to not break different streams' implementations?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -76,6 +76,15 @@
 
 public class IngestionDelayTracker {
 
+  // Class to wrap supported timestamps collected for an ingested event
+  private static class IngestionTimestamps {
+    IngestionTimestamps(long ingestionTimesMs, long creationTimeMs) {
+      _ingestionTimeMs = ingestionTimesMs;
+      _creationTimeMs = creationTimeMs;
+    }
+    public final long _ingestionTimeMs;
+    public final long _creationTimeMs;

Review Comment:
   Do these need to be public?



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java:
##########
@@ -85,29 +85,37 @@ public void testRecordIngestionDelayWithNoAging() {
 
     // Test we follow a single partition up and down
     for (long i = 0; i <= maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, partition0);
+      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);

Review Comment:
   Parentheses are not needed in `(i + 1)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org