You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2023/01/14 02:29:13 UTC

[GitHub] [gobblin] umustafi opened a new pull request, #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

umustafi opened a new pull request, #3625:
URL: https://github.com/apache/gobblin/pull/3625

   … change stream events
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [X] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1766 
   
   
   ### Description
   - [X] Here are some details about my PR, including screenshots (if applicable):
   Modify event for change stream events to add the transaction identifier and produceTimestamp. Use the transaction identifier instead of timestamp to dedup events as "at least once" delivery may result in multiple change stream events with different timestamps for the same row update in mysql. Instead use the timestamp to emit a metric measuring the time between producing the event and consuming on our monitor. 
   
   ### Tests
   - [X] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [X] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1083517225


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = getProduceToConsumeLag(produceTimestamp);

Review Comment:
   This is true, but it's not a limitation we are as worried about. The volume of messages for flows creates/update/kills is realistically less than < 100 per min, in reality probably only in the 10s. If our volume was much higher it makes sense to report per partition level consumption. This should give us an idea of our rate of consumption.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1080602171


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = System.currentTimeMillis() - produceTimestamp;
+    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}",
+        flowGroup, flowName, flowExecutionId, tid, operation, produceToConsumeLagValue);
 
-    String changeIdentifier = timestamp + key;
+    String changeIdentifier = produceTimestamp + key;

Review Comment:
   Yes good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#issuecomment-1382641303

   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3625](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0c7e1db) into [master](https://codecov.io/gh/apache/gobblin/commit/85cd1f9875ba64bd059b70a4d5fc40a38383d326?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (85cd1f9) will **decrease** coverage by `2.68%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3625      +/-   ##
   ============================================
   - Coverage     46.44%   43.76%   -2.69%     
   + Complexity    10621     2063    -8558     
   ============================================
     Files          2130      409    -1721     
     Lines         83303    17626   -65677     
     Branches       9282     2152    -7130     
   ============================================
   - Hits          38692     7714   -30978     
   + Misses        41039     9054   -31985     
   + Partials       3572      858    -2714     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...a/org/apache/gobblin/cluster/GobblinHelixTask.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4VGFzay5qYXZh) | `64.51% <0.00%> (-2.16%)` | :arrow_down: |
   | [...apache/gobblin/runtime/metrics/RuntimeMetrics.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvbWV0cmljcy9SdW50aW1lTWV0cmljcy5qYXZh) | | |
   | [...ervice/monitoring/DagActionStoreChangeMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9EYWdBY3Rpb25TdG9yZUNoYW5nZU1vbml0b3IuamF2YQ==) | | |
   | [...lin/service/monitoring/SpecStoreChangeMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9TcGVjU3RvcmVDaGFuZ2VNb25pdG9yLmphdmE=) | | |
   | [...java/org/apache/gobblin/http/ApacheHttpClient.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4taHR0cC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9odHRwL0FwYWNoZUh0dHBDbGllbnQuamF2YQ==) | | |
   | [...apache/gobblin/metrics/event/sla/SlaEventKeys.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9ldmVudC9zbGEvU2xhRXZlbnRLZXlzLmphdmE=) | | |
   | [...he/gobblin/kafka/client/Kafka08ConsumerClient.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtMDgvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4va2Fma2EvY2xpZW50L0thZmthMDhDb25zdW1lckNsaWVudC5qYXZh) | | |
   | [...in/source/extractor/watermark/SimpleWatermark.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3Ivd2F0ZXJtYXJrL1NpbXBsZVdhdGVybWFyay5qYXZh) | | |
   | [.../main/java/org/apache/gobblin/fork/CopyHelper.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZm9yay9Db3B5SGVscGVyLmphdmE=) | | |
   | [.../apache/gobblin/yarn/GobblinApplicationMaster.java](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbkFwcGxpY2F0aW9uTWFzdGVyLmphdmE=) | | |
   | ... and [1712 more](https://codecov.io/gh/apache/gobblin/pull/3625?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1084527436


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);

Review Comment:
   I agree on not complicating.  here was my reasoning:
   * multiple incoming partitions get combined into a single input
   * each partition is read into a blocking queue, so in cases of overload, the partition can't drain completely to its queue
   * under such conditions, reading multiple partitions more closely resembles round-robin
   * round-robin under overload conditions, when additionally skewed partitions, would lead the loaded partitions to fall farther behind the non-skewed ones
   
   thus, when overload and partition skew, seems possible to have non-uniform delay.
   
   in practice, not sure how likely both those would be.  if not very, that's fair.
   
   measurement-wise, against non-uniform delay, the current impl's essentially randomly chosen value would make the metric to jump around, whereas a high-water mark or percentile would consistently track worst case or each collection interval.
   
   anyway, if my reasoning above checks out, and the conditions seem possible to arise, easy to update to the `AtomicLong`.  if they seem improbable, keep code as-is, but perhaps add a warning/comment about where it breaks down.  seem reasonable?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1080610719


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -171,5 +178,6 @@ protected void createMetrics() {
     this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
     this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
+    this.produceToConsumeLag = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_LAG, () -> produceToConsumeLagValue);

Review Comment:
   Yea I agree, there's potential to refactor these two monitor classes by creating a base case with the repeated code then handle the store specific logic. I'll keep that in mind for future improvements. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1081058477


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -333,4 +333,8 @@ public void run() {
       }
     }
   }
+
+  public Long getProduceToConsumeLag(Long produceTimestamp) {
+    return System.currentTimeMillis() - produceTimestamp;
+  }

Review Comment:
   this may not be the right place for this method.  the HLC is used quite widely and no uses other than these recently introduced change monitoring consumers have a need for this.  the method should live closer to them, either in their common base class or, in the absence of an exclusively shared ancestor, duplicated in each.



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",
+    "type" : "string",
+    "doc" : "ID to uniquely identify the transaction. Used for identifying duplicate messages with different timestamps for the same transaction.",
+    "compliance" : "NONE"
+  }, {
+    "name" : "produceTimestamp",
     "type" : "long",
-    "doc" : "Time the change occurred",
+    "doc" : "Time the change was produced to topic (separate than the time of the update to the store)",

Review Comment:
   BTW, reading more closely this time... I'm a big believer in specifying both in the field name and the doc string whether we're talking secs, millis, nanos, etc.  misalignment there quickly leads to bogus results



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -176,6 +185,7 @@ protected void createMetrics() {
     this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
+    this.produceToConsumeLag = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_LAG, () -> produceToConsumeLagValue);

Review Comment:
   this has me doing a double-take: is the gauge's value being taken by somewhere reaching back into the change monitor instance to read the `private Long produceToConsumeLagValue`?  aside from being hard to follow, that's not thread-safe (since I presume the value for the gauge is being read from a different thread).
   
   to keep as-is, be sure to declare the instance variable `volatile` - see: https://stackoverflow.com/a/3038233
   
   better still, however, would be to invoke the gauge within the `processMessage` call stack to explicitly pass it whatever value seen at that time.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = getProduceToConsumeLag(produceTimestamp);

Review Comment:
   is this restricted to single-partitioned streams?  I don't see any logic to reconcile the most-recently observed value against what may be measured on other partitions.  in general, delay arises at partition-granularity.  to aggregate beyond one value per partition, we might use the maximum.
   
   at present, a single partition may be sufficient for the expected CDC throughput.  still, let's document this assumption.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -333,4 +333,8 @@ public void run() {
       }
     }
   }
+
+  public Long getProduceToConsumeLag(Long produceTimestamp) {
+    return System.currentTimeMillis() - produceTimestamp;
+  }

Review Comment:
   I still don't prefer it here, but I would look more kindly on a method like:
   ```
   protected long getCurrentTimeMillis() {...}
   ```
   or 
   ```
   protected long calcMillisSince(long time) { ... }
   ```
   @ZihanLi58 may share her thoughts.



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",

Review Comment:
   renaming a field in the schema is an incompatible change, as is:
   * reordering fields
   * adding a new field anywhere but the end of the record
   * adding a field without a default.
   
   this PR's code looks like only the consumer-side.  of course the producer would also need to change in lock-step to now `setTxId`, rather than `setTimestamp`.  as such tight coordination between producer(s) and consumer(s) is challenging, among other reasons, we prefer to avoid incompatible changes



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java:
##########
@@ -155,6 +156,14 @@ public void testConsumerManualOffsetCommit() throws Exception {
     consumer.shutDown();
   }
 
+  @Test
+  public void testCalculateProduceToConsumeLag() {
+    MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigFactory.empty(),
+        NUM_PARTITIONS);
+    Long produceTimestamp = 1234567890000L;
+    Assert.assertTrue(consumer.getProduceToConsumeLag(produceTimestamp).equals(123L));

Review Comment:
   this assertion isn't easily verified when scattered across files.  much clearer would be instead to override the current time via an anonymous inner class of `MockedHighLevelConsumer`.
   
   also, totally fine to make testing timestamps 'short':
   ```
   long currentTimeMillis = 1123L;
   long produceTimestamp = 1000L;
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -36,15 +36,17 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS = "gobblin.jobMonitor.slaevent.rejectedevents";
   public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES =
       "gobblin.jobMonitor.kafka.messageParseFailures";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.successful.added.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors";
-  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.kills.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.resumes.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.unexpected.errors";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.successful.added.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.failed.added.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.unexpected.errors";
+  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
+  public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_LAG = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.lag";

Review Comment:
   this didn't register earlier: in kafka contexts, I see lag used for the count of unprocessed messages; e.g. https://sematext.com/blog/kafka-consumer-lag-offsets-monitoring/#how-is-kafka-consumer-lag-calculated
   
   to not confuse, perhaps should we name 'delay' or 'delayMillis'?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -119,15 +125,18 @@ protected void processMessage(DecodeableKafkaRecord message) {
       try {
         dagAction = dagActionStore.getDagAction(flowGroup, flowName, flowExecutionId).getDagActionValue();
       } catch (IOException e) {
-        log.warn("Encountered IOException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, e);
+        log.error("Encountered IOException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, e);
         this.unexpectedErrors.mark();
+        return;
       } catch (SpecNotFoundException e) {
-        log.warn("DagAction not found for flow group: {} name: {} executionId: {} Exception: {}", flowGroup, flowName,
+        log.error("DagAction not found for flow group: {} name: {} executionId: {} Exception: {}", flowGroup, flowName,
             flowExecutionId, e);
         this.unexpectedErrors.mark();
+        return;
       } catch (SQLException throwables) {
-        log.warn("Encountered SQLException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, throwables);
+        log.error("Encountered SQLException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, throwables);
         throwables.printStackTrace();

Review Comment:
   what's the intent w/ printing... since it's already given to `log.error()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1072949890


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",
+    "type" : "string",
+    "doc" : "ID to uniquely identify the transaction",
+    "compliance" : "NONE"
+  }, {
+    "name" : "produceTimestamp",
     "type" : "long",
-    "doc" : "Time the change occurred",
+    "doc" : "Time the change produced to topic",

Review Comment:
   is it correct that this is not the same as when the store recorded the update, because there may be an async lag between store update and when the corresponding change notification is produced?  if so, perhaps have this doc clarify.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = System.currentTimeMillis() - produceTimestamp;

Review Comment:
   general comment, since I don't see tests here: directly reaching into the environment to grab the time can complicate testing.  easier testability comes from either taking a ctor param of a function to produce the time (Strategy pattern) or wrapping a time call within a protected method, which a test could override to supply whatever known/expected timestamps.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -171,5 +178,6 @@ protected void createMetrics() {
     this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
     this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
+    this.produceToConsumeLag = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_LAG, () -> produceToConsumeLagValue);

Review Comment:
   clearly some impl overlap between these two change monitor classes.  may be worth investigating later whether they could both use the same base class to take care of the message reading, caching, and also monitoring.  since they both read `GenericChangeStoreEvent`s), seems quite promising. only store-specific logic need be in either of the two derived classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1082123322


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);

Review Comment:
   as mentioned in my reply, I'm not sure that partition delay would be uniform, so, methodology-wise, to generate a singular number, it seems we presently take the last value seen during the collection period irregardless of partition.
   
   that's questionable, from a monitoring perspective.
   
   another option might be to use the max value during the collection period (HWM).  a variant would be some percentile, such as P99/P95 (e.g. via a t-digest).
   
   most salient, monitoring-wise, might be the max of the per-partition most-recently-seen values.  this captures recovery/improvement during the collection window.
   
   considering the max/HWM may be easiest to code, it seems a reasonable approximation.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = getProduceToConsumeLag(produceTimestamp);

Review Comment:
   even with only one thread, I don't see how we're guaranteed to read from all partitions at the same rate.  e.g. what if messages skew to some partitions more than others?  if the per-partition produce rate differs during a time when the single-threaded consumer can't keep up, couldn't processing delay per partition become non-uniform?



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",

Review Comment:
   gotcha.  understanding now that these messages don't travel on a message topic and instead move within a single process space, I agree the producer and consumer can easily update in lockstep, so compatibility's not a concern



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -176,6 +184,7 @@ protected void createMetrics() {
     this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
+    this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue);

Review Comment:
   if you choose to go with the max/HWM, I suggest an `AtomicLong` that you `.getAndSet(0L)` here, and meanwhile `getAndAccumulate` above (using max).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1081826513


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = getProduceToConsumeLag(produceTimestamp);

Review Comment:
   This will report delay for all partitions. We consume from all partitions and process them in one thread here. I'll add a comment to explain that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1084527436


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);

Review Comment:
   I agree on not complicating.  here's my reasoning:
   * multiple incoming partitions get combined into a single input
   * each partition is read into a blocking queue, so in cases of overload, the partition can't drain completely to its queue
   * under such conditions, reading multiple partitions more closely resembles round-robin
   * using round-robin under overload conditions, when skewed partitions, would lead those to fall farther behind than non-skewed partitions
   
   thus, if overload and partition skew, seems possible to have non-uniform delay.
   
   in practice, not sure how likely both those would be.  perhaps not very.
   
   measurement-wise, against non-uniform delay, this essentially randomly chosen value would make the metric to jump around, whereas a high-water mark or percentile would consistently track worst case or each collection interval.
   
   anyway, if my reasoning above checks out, and the conditions seem possible to arise, maybe update to the `AtomicLong`.  if they seem too unlikely, just keep as-is, but perhaps add a warning/comment about where it breaks down.  seem reasonable?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1080737848


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = System.currentTimeMillis() - produceTimestamp;

Review Comment:
   I tried to refactor the processMessage code into two methods, one that would obtain all the information from the message and the second that would do the corresponding action on the store. Because Java doesn't support tuple unpacking I would have to assign all the values explicitly and end up with some overly complex code. I did however separate this function to produce time into its own method that I overwrote and created a small test around. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1080604384


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",
+    "type" : "string",
+    "doc" : "ID to uniquely identify the transaction",
+    "compliance" : "NONE"
+  }, {
+    "name" : "produceTimestamp",
     "type" : "long",
-    "doc" : "Time the change occurred",
+    "doc" : "Time the change produced to topic",

Review Comment:
   Yes this is not the same time as when the store recorded the update. The lag between the update to the row and this produce time will also be measured (in a separate part of the code). Clarifying this in the doc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1080587369


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",

Review Comment:
   I want to rename this field to be more clear, this timestamp will be produceTimestamp specifically. I will update depending code in other places before deploying, is there still an issue with  incompatible schema change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1081680380


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -333,4 +333,8 @@ public void run() {
       }
     }
   }
+
+  public Long getProduceToConsumeLag(Long produceTimestamp) {
+    return System.currentTimeMillis() - produceTimestamp;
+  }

Review Comment:
   Changing it to a more general function to `calcMillisSince` we may want to use this to measure delay in other extensions of the class as well. Later I may refactor the two monitors to derive from a common base class and can move there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1081842299


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -176,6 +185,7 @@ protected void createMetrics() {
     this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
+    this.produceToConsumeLag = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_LAG, () -> produceToConsumeLagValue);

Review Comment:
   Yes the gauge value should be tied to changes to the `produceToConsumeLagValue`. We don't have a function to update a gauge's value and from what I can see we define the gauge's value to map back to a changing variable (either a volatile or atomic value). I'm updating the value to be a volatile variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1084527436


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);

Review Comment:
   I agree on not complicating.  here's my reasoning:
   * multiple incoming partitions get combined into a single input
   * each partition is read into a blocking queue, so in cases of overload, the partition can't drain completely to its queue
   * under such conditions, reading multiple partitions more closely resembles round-robin
   * round-robin under overload conditions, when additionally skewed partitions, would lead the loaded partitions to fall farther behind the non-skewed ones
   
   thus, when overload and partition skew, seems possible to have non-uniform delay.
   
   in practice, not sure how likely both those would be.  perhaps not very.
   
   measurement-wise, against non-uniform delay, the current impl's essentially randomly chosen value would make the metric to jump around, whereas a high-water mark or percentile would consistently track worst case or each collection interval.
   
   anyway, if my reasoning above checks out, and the conditions seem possible to arise, easy to update to the `AtomicLong`.  if they seem improbable, keep code as-is, but perhaps add a warning/comment about where it breaks down.  seem reasonable?
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);

Review Comment:
   I agree on not complicating.  here was my reasoning:
   * multiple incoming partitions get combined into a single input
   * each partition is read into a blocking queue, so in cases of overload, the partition can't drain completely to its queue
   * under such conditions, reading multiple partitions more closely resembles round-robin
   * round-robin under overload conditions, when additionally skewed partitions, would lead the loaded partitions to fall farther behind the non-skewed ones
   
   thus, when overload and partition skew, seems possible to have non-uniform delay.
   
   in practice, not sure how likely both those would be.  perhaps not very.
   
   measurement-wise, against non-uniform delay, the current impl's essentially randomly chosen value would make the metric to jump around, whereas a high-water mark or percentile would consistently track worst case or each collection interval.
   
   anyway, if my reasoning above checks out, and the conditions seem possible to arise, easy to update to the `AtomicLong`.  if they seem improbable, keep code as-is, but perhaps add a warning/comment about where it breaks down.  seem reasonable?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1084527436


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);

Review Comment:
   I agree on not complicating.  here was my reasoning:
   * multiple incoming partitions get combined into a single input
   * each partition is read into a blocking queue, so in cases of overload, the partition can't drain completely to its queue
   * under such conditions, reading multiple partitions more closely resembles round-robin
   * round-robin under overload conditions, when additionally skewed partitions, would lead the loaded partitions to fall farther behind the non-skewed ones
   
   thus, when overload and partition skew, seems possible to have non-uniform delay.
   
   in practice, not sure how likely both those would be.  if not very, that's fair.
   
   measurement-wise, against non-uniform delay, the current impl's essentially randomly chosen value would make the metric to jump around, whereas a high-water mark or percentile would consistently track worst case for each collection interval.
   
   anyway, if my reasoning above checks out, and the conditions seem possible to arise, easy to update to the `AtomicLong`.  if they seem improbable, keep code as-is, but perhaps add a warning/comment about where it breaks down.  WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1072969190


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -171,5 +178,6 @@ protected void createMetrics() {
     this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
     this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
+    this.produceToConsumeLag = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_LAG, () -> produceToConsumeLagValue);

Review Comment:
   clearly some impl overlap between these two change monitor classes.  may be worth investigating later whether they could both use the same base class to take care of the message reading, caching, and also monitoring.  since they both read `GenericChangeStoreEvent`s), seems quite promising. only store-specific handling logic need be in either of the two derived classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1081645025


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",

Review Comment:
   Yes this is incompatible but I'll be changing the producer side in lockstep by updating both versions in our deployed service at once. I could add the new field at the end of the record but I did want to rename the timestamp field to be more clear about which timestamp it is now that I understand it was not the one I intended. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1083159563


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestampMillis();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);

Review Comment:
   Our main concern with measuring the delays here is to flag if there's a big timing difference between the tx occurring in the store versus us being able to consume the event. I suspect the delay will more likely be on the tx occurring to brooklin producing rather than us being slow as consumers. I don't see why partition delay would actually not be uniform because if we're performing poorly overall it would be on all partitions rather than one in particular. I don't want to overcomplicate this calculation as it's just to give us a ballpark on the delay, when the more important one is the eventToProduceDelay which I will measure in another location. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 merged pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 merged PR #3625:
URL: https://github.com/apache/gobblin/pull/3625


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1072860103


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = System.currentTimeMillis() - produceTimestamp;
+    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}",
+        flowGroup, flowName, flowExecutionId, tid, operation, produceToConsumeLagValue);
 
-    String changeIdentifier = timestamp + key;
+    String changeIdentifier = produceTimestamp + key;

Review Comment:
   You want to use tid here?



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",

Review Comment:
   This seems to be an incompatible change in schema. Maybe add another field as txId and keep timestamp here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#issuecomment-1382638995

   @ZihanLi58 @phet can you review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
umustafi commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1081821627


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -36,15 +36,17 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS = "gobblin.jobMonitor.slaevent.rejectedevents";
   public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES =
       "gobblin.jobMonitor.kafka.messageParseFailures";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.successful.added.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors";
-  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.kills.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.resumes.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.unexpected.errors";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.successful.added.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.failed.added.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.unexpected.errors";
+  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.message.processed";
+  public static final String GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_LAG = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specstoreMonitor.produce.to.consume.lag";

Review Comment:
   That's a good point they do mean lag to mean the count of messages, we can use delay millis to be clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consume…

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1081820136


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -333,4 +333,8 @@ public void run() {
       }
     }
   }
+
+  public Long getProduceToConsumeLag(Long produceTimestamp) {
+    return System.currentTimeMillis() - produceTimestamp;
+  }

Review Comment:
   I'm not strongly against ading `calcMillisSince` here.
   
   still, let's acknowledge that once it becomes part of this class' interface, it generally can't move, w/o risk to breaking OSS clients.  at most we could deprecate before later removal... but that's not a helpful experience for our users, to force them later to switch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org