You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:42 UTC
[24/50] incubator-gobblin git commit: [GOBBLIN-403] Fix null pointer
issue due to kafkajobmonitor metrics is not initialized in the constructor
[GOBBLIN-403] Fix null pointer issue due to kafkajobmonitor metrics is not initialized in the constructor
Closes #2277 from yukuai518/fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/19b2d81b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/19b2d81b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/19b2d81b
Branch: refs/heads/0.12.0
Commit: 19b2d81b9539207beaca70c2efb36f258160fa27
Parents: 34de6bf
Author: Kuai Yu <ku...@linkedin.com>
Authored: Tue Feb 6 14:16:03 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Feb 6 14:16:03 2018 -0800
----------------------------------------------------------------------
.../service/StreamingKafkaSpecConsumer.java | 25 +++++++++++++++++---
1 file changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/19b2d81b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 4764603..5fd5413 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -203,6 +203,8 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
private ContextAwareGauge<Long> jobSpecEnq;
private ContextAwareGauge<Long> jobSpecDeq;
private ContextAwareGauge<Long> jobSpecConsumed;
+ private ContextAwareGauge<Long> jobSpecParseFailures;
+
private AtomicLong jobSpecEnqCount = new AtomicLong(0);
private AtomicLong jobSpecDeqCount = new AtomicLong(0);
@@ -210,14 +212,30 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = "specConsumerJobSpecEnq";
public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = "specConsumerJobSpecDeq";
public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = "specConsumerJobSpecConsumed";
-
+ public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = "specConsumerJobSpecParseFailures";
public Metrics(MetricContext context) {
this.jobSpecQueueSize = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size());
this.jobSpecEnq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get());
this.jobSpecDeq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get());
this.jobSpecConsumed = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
- ()->StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() + StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount());
+ ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures());
+ this.jobSpecParseFailures = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures());
+ }
+
+ private long getNewSpecs() {
+ return StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs() != null?
+ StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() : 0;
+ }
+
+ private long getRemovedSpecs() {
+ return StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs() != null?
+ StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount() : 0;
+ }
+
+ private long getMessageParseFailures() {
+ return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null?
+ StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0;
}
public Collection<ContextAwareGauge<?>> getGauges() {
@@ -226,13 +244,14 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
list.add(jobSpecEnq);
list.add(jobSpecDeq);
list.add(jobSpecConsumed);
+ list.add(jobSpecParseFailures);
return list;
}
}
@Override
public StandardMetrics getStandardMetrics() {
- throw new UnsupportedOperationException("Implemented in sub class");
+ return this._metrics;
}
@Nonnull