You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2023/01/18 00:33:13 UTC

[GitHub] [gobblin] phet commented on a diff in pull request #3625: [GOBBLIN-1766] Define metric to measure lag from producing to consumeā€¦

phet commented on code in PR #3625:
URL: https://github.com/apache/gobblin/pull/3625#discussion_r1072949890


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc:
##########
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",
+    "type" : "string",
+    "doc" : "ID to uniquely identify the transaction",
+    "compliance" : "NONE"
+  }, {
+    "name" : "produceTimestamp",
     "type" : "long",
-    "doc" : "Time the change occurred",
+    "doc" : "Time the change produced to topic",

Review Comment:
   is it correct that this is not the same as when the store recorded the update, because there may be an async lag between store update and when the corresponding change notification is produced?  if so, perhaps have this doc clarify.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -98,18 +102,20 @@ protected void processMessage(DecodeableKafkaRecord message) {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = value.getChangeEventIdentifier().getProduceTimestamp();
     String operation = value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeLagValue = System.currentTimeMillis() - produceTimestamp;

Review Comment:
   general comment, since I don't see tests here: directly reaching into the environment to grab the time can complicate testing.  easier testability comes from either taking a ctor param of a function to produce the time (Strategy pattern) or wrapping a time call within a protected method, which a test could override to supply whatever known/expected timestamps.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -171,5 +178,6 @@ protected void createMetrics() {
     this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
     this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
+    this.produceToConsumeLag = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_LAG, () -> produceToConsumeLagValue);

Review Comment:
   clearly some impl overlap between these two change monitor classes.  may be worth investigating later whether they could both use the same base class to take care of the message reading, caching, and also monitoring.  since they both read `GenericChangeStoreEvent`s), seems quite promising. only store-specific logic need be in either of the two derived classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org