You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:42 UTC

[24/50] incubator-gobblin git commit: [GOBBLIN-403] Fix null pointer issue due to kafkajobmonitor metrics is not initialized in the constructor

[GOBBLIN-403] Fix null pointer issue due to kafkajobmonitor metrics is not initialized in the constructor

Closes #2277 from yukuai518/fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/19b2d81b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/19b2d81b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/19b2d81b

Branch: refs/heads/0.12.0
Commit: 19b2d81b9539207beaca70c2efb36f258160fa27
Parents: 34de6bf
Author: Kuai Yu <ku...@linkedin.com>
Authored: Tue Feb 6 14:16:03 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Feb 6 14:16:03 2018 -0800

----------------------------------------------------------------------
 .../service/StreamingKafkaSpecConsumer.java     | 25 +++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/19b2d81b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 4764603..5fd5413 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -203,6 +203,8 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
     private ContextAwareGauge<Long> jobSpecEnq;
     private ContextAwareGauge<Long> jobSpecDeq;
     private ContextAwareGauge<Long> jobSpecConsumed;
+    private ContextAwareGauge<Long> jobSpecParseFailures;
+
     private AtomicLong jobSpecEnqCount = new AtomicLong(0);
     private AtomicLong jobSpecDeqCount = new AtomicLong(0);
 
@@ -210,14 +212,30 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
     public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = "specConsumerJobSpecEnq";
     public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = "specConsumerJobSpecDeq";
     public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = "specConsumerJobSpecConsumed";
-
+    public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = "specConsumerJobSpecParseFailures";
 
     public Metrics(MetricContext context) {
       this.jobSpecQueueSize = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size());
       this.jobSpecEnq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get());
       this.jobSpecDeq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get());
       this.jobSpecConsumed = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
-          ()->StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() + StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount());
+          ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures());
+      this.jobSpecParseFailures = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures());
+    }
+
+    private long getNewSpecs() {
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs() != null?
+          StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() : 0;
+    }
+
+    private long getRemovedSpecs() {
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs() != null?
+          StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount() : 0;
+    }
+
+    private long getMessageParseFailures() {
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null?
+          StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0;
     }
 
     public Collection<ContextAwareGauge<?>> getGauges() {
@@ -226,13 +244,14 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
       list.add(jobSpecEnq);
       list.add(jobSpecDeq);
       list.add(jobSpecConsumed);
+      list.add(jobSpecParseFailures);
       return list;
     }
   }
 
   @Override
   public StandardMetrics getStandardMetrics() {
-    throw new UnsupportedOperationException("Implemented in sub class");
+    return this._metrics;
   }
 
   @Nonnull