You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/08/09 20:02:43 UTC

[GitHub] [gobblin] jack-moseley commented on a change in pull request #3354: [GOBBLIN-1508] update some cluster side metrics from guage to meter

jack-moseley commented on a change in pull request #3354:
URL: https://github.com/apache/gobblin/pull/3354#discussion_r685483360



##########
File path: gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
##########
@@ -196,44 +197,48 @@ public void onCancelJob(URI cancelledJobURI) {
 
       try {
         _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.UPDATE, updatedJob));
-        _metrics.jobSpecEnqCount.incrementAndGet();
+        _metrics.specConsumerJobSpecEnq.mark();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
     }
   }
 
   private class Metrics extends StandardMetricsBridge.StandardMetrics {
-    private AtomicLong jobSpecEnqCount = new AtomicLong(0);
-    private AtomicLong jobSpecDeqCount = new AtomicLong(0);
+    private final ContextAwareMeter specConsumerJobSpecEnq;
+    private final ContextAwareMeter specConsumerJobSpecDeq;
 
     public static final String SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE = "specConsumerJobSpecQueueSize";
     public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = "specConsumerJobSpecEnq";
     public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = "specConsumerJobSpecDeq";
-    public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = "specConsumerJobSpecConsumed";
-    public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = "specConsumerJobSpecParseFailures";
 
     public Metrics(MetricContext context) {
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size()));
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get()));
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get()));
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
-          ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures()));
-      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures()));
+      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE,
+          StreamingKafkaSpecConsumer.this._jobSpecQueue::size));
+      this.specConsumerJobSpecEnq = context.contextAwareMeter(SPEC_CONSUMER_JOB_SPEC_ENQ);
+      this.contextAwareMetrics.add(this.specConsumerJobSpecEnq);
+      this.specConsumerJobSpecDeq = context.contextAwareMeter(SPEC_CONSUMER_JOB_SPEC_DEQ);
+      this.contextAwareMetrics.add(this.specConsumerJobSpecDeq);
+      this.contextAwareMetrics.add(_jobMonitor.getNewSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getUpdatedSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getRemovedSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getCancelledSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getConsumedSpecs());
+      this.contextAwareMetrics.add(_jobMonitor.getMessageParseFailures());
     }
 
     private long getNewSpecs() {
-      return StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs() != null?
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs() != null ?
           StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() : 0;
     }
 
     private long getRemovedSpecs() {
-      return StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs() != null?
-          StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs().getCount() : 0;
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getCancelledSpecs() != null ?
+          StreamingKafkaSpecConsumer.this._jobMonitor.getCancelledSpecs().getCount() : 0;
     }
 
     private long getMessageParseFailures() {
-      return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null?
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null ?
           StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0;
     }

Review comment:
       Are these methods not used anymore? Can we just remove them if so?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org