You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2023/01/17 21:43:51 UTC

[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consumeā€¦

ZihanLi58 commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1072860103


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = System.currentTimeMillis() - produceTimestamp;
+    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} tid: {} operation: {} lag: {}",
+        flowGroup, flowName, flowExecutionId, tid, operation, produceToConsumeLagValue);
 
-    String changeIdentifier = timestamp + key;
+    String changeIdentifier = produceTimestamp + key;

Review Comment:
   You want to use tid here?



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",

Review Comment:
   This seems to be an incompatible change in schema. Maybe add another field as txId and keep timestamp here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org