You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/08/09 21:01:57 UTC

[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3354: [GOBBLIN-1508] update some cluster side metrics from guage to meter

arjun4084346 commented on a change in pull request #3354:
URL: https://github.com/apache/gobblin/pull/3354#discussion_r685518368



##########
File path: gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
##########
@@ -196,44 +197,48 @@ public void onCancelJob(URI cancelledJobURI) {
 
       try {
         _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.UPDATE, updatedJob));
-        _metrics.jobSpecEnqCount.incrementAndGet();
+        _metrics.specConsumerJobSpecEnq.mark();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
     }
   }
 
   private class Metrics extends StandardMetricsBridge.StandardMetrics {
-    private AtomicLong jobSpecEnqCount = new AtomicLong(0);
-    private AtomicLong jobSpecDeqCount = new AtomicLong(0);
+    private final ContextAwareMeter specConsumerJobSpecEnq;
+    private final ContextAwareMeter specConsumerJobSpecDeq;
 
     public static final String SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE = "specConsumerJobSpecQueueSize";
     public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = "specConsumerJobSpecEnq";
     public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = "specConsumerJobSpecDeq";
-    public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = "specConsumerJobSpecConsumed";
-    public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = "specConsumerJobSpecParseFailures";
 
     public Metrics(MetricContext context) {
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size()));
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get()));
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get()));
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
-          ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures()));
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures()));
+      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE,
+          StreamingKafkaSpecConsumer.this._jobSpecQueue::size));
+      this.specConsumerJobSpecEnq = context.contextAwareMeter(SPEC_CONSUMER_JOB_SPEC_ENQ);
+      this.contextAwareMetrics.add(this.specConsumerJobSpecEnq);
+      this.specConsumerJobSpecDeq = context.contextAwareMeter(SPEC_CONSUMER_JOB_SPEC_DEQ);
+      this.contextAwareMetrics.add(this.specConsumerJobSpecDeq);
+      this.contextAwareMetrics.add(_jobMonitor.getNewSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getUpdatedSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getRemovedSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getCancelledSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getConsumedSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getMessageParseFailures());
     }
 
     private long getNewSpecs() {
-      return StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs() != null?
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs() != null ?
           StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() : 0;
     }
 
     private long getRemovedSpecs() {
-      return StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs() != null?
-          StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs().getCount() : 0;
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getCancelledSpecs() != null ?
+          StreamingKafkaSpecConsumer.this._jobMonitor.getCancelledSpecs().getCount() : 0;
     }
 
     private long getMessageParseFailures() {
-      return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null?
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null ?
           StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0;
     }

Review comment:
       They are being used. We might not seeing metrics so far, but that was due to a bug, which I am fixing in https://github.com/apache/gobblin/pull/3350




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org