You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:33 UTC
[15/50] incubator-gobblin git commit: [GOBBLIN-402] Add more metrics
for gobblin cluster and fix the getJobs slowness issue
[GOBBLIN-402] Add more metrics for gobblin cluster and fix the getJobs slowness issue
Closes #2276 from yukuai518/morem
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/de83a3fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/de83a3fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/de83a3fb
Branch: refs/heads/0.12.0
Commit: de83a3fb5f2644e9657b546c7415d1e8f5336a2c
Parents: 8879cde
Author: Kuai Yu <ku...@linkedin.com>
Authored: Fri Feb 2 13:25:53 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Feb 2 13:25:53 2018 -0800
----------------------------------------------------------------------
.../gobblin/cluster/GobblinClusterManager.java | 6 +-
.../cluster/GobblinHelixJobScheduler.java | 33 ++++++-
.../StreamingJobConfigurationManager.java | 3 +
.../service/StreamingKafkaSpecConsumer.java | 92 +++++++++++++++++++-
.../apache/gobblin/runtime/api/JobCatalog.java | 7 +-
.../runtime/api/JobExecutionLauncher.java | 6 ++
.../runtime/job_monitor/KafkaJobMonitor.java | 3 +
7 files changed, 139 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 77e511e..3393df6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -153,6 +153,9 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
private MutableJobCatalog jobCatalog;
@Getter
private GobblinHelixJobScheduler jobScheduler;
+ @Getter
+ private JobConfigurationManager jobConfigurationManager;
+
private final String clusterName;
private final Config config;
private final MetricContext metricContext;
@@ -209,7 +212,8 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
this.jobScheduler = buildGobblinHelixJobScheduler(config, this.appWorkDir, getMetadataTags(clusterName, applicationId),
schedulerService);
this.applicationLauncher.addService(this.jobScheduler);
- this.applicationLauncher.addService(buildJobConfigurationManager(config));
+ this.jobConfigurationManager = buildJobConfigurationManager(config);
+ this.applicationLauncher.addService(this.jobConfigurationManager);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 36ba542..141e3d1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
@@ -157,7 +159,17 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
private final ContextAwareTimer timeBeforeJobScheduling;
private final ContextAwareTimer timeBeforeJobLaunching;
+ private final ThreadPoolExecutor threadPoolExecutor;
+ private final ContextAwareGauge<Integer> executorActiveCount;
+ private final ContextAwareGauge<Integer> executorMaximumPoolSize;
+ private final ContextAwareGauge<Integer> executorPoolSize;
+ private final ContextAwareGauge<Integer> executorCorePoolSize;
+ private final ContextAwareGauge<Integer> executorQueueSize;
+
public Metrics(final MetricContext metricContext) {
+ // Thread executor reference from job scheduler
+ this.threadPoolExecutor = (ThreadPoolExecutor)GobblinHelixJobScheduler.this.jobExecutor;
+
// All historical counters
this.totalJobsLaunched = new AtomicLong(0);
this.totalJobsCompleted = new AtomicLong(0);
@@ -177,6 +189,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES);
this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, 1, TimeUnit.MINUTES);
this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, 1, TimeUnit.MINUTES);
+
+ // executor metrics
+ this.executorActiveCount = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount());
+ this.executorMaximumPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize());
+ this.executorPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize());
+ this.executorCorePoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize());
+ this.executorQueueSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size());
}
private void updateTimeBeforeJobScheduling (Properties jobConfig) {
@@ -196,7 +215,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
@Override
public Collection<ContextAwareGauge<?>> getGauges() {
- return ImmutableList.of(numJobsRunning, numJobsLaunched, numJobsCompleted, numJobsCommitted, numJobsFailed, numJobsCancelled);
+ List<ContextAwareGauge<?>> list = Lists.newArrayList();
+ list.add(numJobsRunning);
+ list.add(numJobsLaunched);
+ list.add(numJobsCompleted);
+ list.add(numJobsCommitted);
+ list.add(numJobsFailed);
+ list.add(numJobsCancelled);
+ list.add(executorActiveCount);
+ list.add(executorMaximumPoolSize);
+ list.add(executorPoolSize);
+ list.add(executorCorePoolSize);
+ list.add(executorQueueSize);
+ return list;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
index 849dd6a..3c01704 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
@@ -45,6 +45,8 @@ import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.runtime.api.SpecConsumer;
+import lombok.Getter;
+
/**
* A {@link JobConfigurationManager} that fetches job specs from a {@link SpecConsumer} in a loop
@@ -56,6 +58,7 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager {
private final ExecutorService fetchJobSpecExecutor;
+ @Getter
private final SpecConsumer specConsumer;
private final long stopTimeoutSeconds;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 23966e9..4764603 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -21,18 +21,29 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+
import org.slf4j.Logger;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -49,6 +60,9 @@ import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY;
+
+import javax.annotation.Nonnull;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -56,12 +70,16 @@ import lombok.extern.slf4j.Slf4j;
* SpecConsumer that consumes from kafka in a streaming manner
* Implemented {@link AbstractIdleService} for starting up and shutting down.
*/
-public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable {
+public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable, StandardMetricsBridge {
public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize";
private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
+ @Getter
private final AvroJobSpecKafkaJobMonitor _jobMonitor;
private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> _jobSpecQueue;
private final MutableJobCatalog _jobCatalog;
+ private final MetricContext _metricContext;
+ private final Metrics _metrics;
+ private final boolean _isInstrumentedEnabled;
public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) {
String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic,
@@ -73,10 +91,12 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
} catch (IOException e) {
throw new RuntimeException("Could not create job monitor", e);
}
-
+ _isInstrumentedEnabled = GobblinMetrics.isEnabled(ConfigUtils.configToProperties(config));
_jobCatalog = jobCatalog;
_jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
+ _metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
+ _metrics = new Metrics(this._metricContext);
}
public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) {
@@ -98,7 +118,7 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
try {
Pair<SpecExecutor.Verb, Spec> specPair = _jobSpecQueue.take();
-
+ _metrics.jobSpecDeqCount.incrementAndGet();
do {
changesSpecs.add(specPair);
@@ -145,6 +165,7 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
try {
_jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.ADD, addedJob));
+ _metrics.jobSpecEnqCount.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -159,6 +180,7 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props);
_jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.DELETE, jobSpecBuilder.build()));
+ _metrics.jobSpecEnqCount.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -169,9 +191,73 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
try {
_jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.UPDATE, updatedJob));
+ _metrics.jobSpecEnqCount.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
+
+ private class Metrics extends StandardMetricsBridge.StandardMetrics {
+ private ContextAwareGauge<Integer> jobSpecQueueSize;
+ private ContextAwareGauge<Long> jobSpecEnq;
+ private ContextAwareGauge<Long> jobSpecDeq;
+ private ContextAwareGauge<Long> jobSpecConsumed;
+ private AtomicLong jobSpecEnqCount = new AtomicLong(0);
+ private AtomicLong jobSpecDeqCount = new AtomicLong(0);
+
+ public static final String SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE = "specConsumerJobSpecQueueSize";
+ public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = "specConsumerJobSpecEnq";
+ public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = "specConsumerJobSpecDeq";
+ public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = "specConsumerJobSpecConsumed";
+
+
+ public Metrics(MetricContext context) {
+ this.jobSpecQueueSize = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size());
+ this.jobSpecEnq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get());
+ this.jobSpecDeq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get());
+ this.jobSpecConsumed = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
+ ()->StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() + StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount());
+ }
+
+ public Collection<ContextAwareGauge<?>> getGauges() {
+ List list = Lists.newArrayList();
+ list.add(jobSpecQueueSize);
+ list.add(jobSpecEnq);
+ list.add(jobSpecDeq);
+ list.add(jobSpecConsumed);
+ return list;
+ }
+ }
+
+ @Override
+ public StandardMetrics getStandardMetrics() {
+ throw new UnsupportedOperationException("Implemented in sub class");
+ }
+
+ @Nonnull
+ @Override
+ public MetricContext getMetricContext() {
+ return _metricContext;
+ }
+
+ @Override
+ public boolean isInstrumentationEnabled() {
+ return _isInstrumentedEnabled;
+ }
+
+ @Override
+ public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public void switchMetricContext(List<Tag<?>> tags) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void switchMetricContext(MetricContext context) {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 950b86d..42ecef3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -95,12 +95,7 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedJobs.get());
this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedJobs.get());
this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedJobs.get());
- this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{
- long startTime = System.currentTimeMillis();
- int size = jobCatalog.getJobs().size();
- updateGetJobTime(startTime);
- return size;
- });
+ this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->(int)(totalAddedJobs.get() - totalDeletedJobs.get()));
}
public void updateGetJobTime(long startTime) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index a7e5878..3f50ee7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -49,6 +49,12 @@ public interface JobExecutionLauncher extends Instrumentable {
public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling";
public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching";
+ public static final String EXECUTOR_ACTIVE_COUNT = "executorActiveCount";
+ public static final String EXECUTOR_MAX_POOL_SIZE = "executorMaximumPoolSize";
+ public static final String EXECUTOR_POOL_SIZE = "executorPoolSize";
+ public static final String EXECUTOR_CORE_POOL_SIZE = "executorCorePoolSize";
+ public static final String EXECUTOR_QUEUE_SIZE = "executorQueueSize";
+
public static final String TRACKING_EVENT_NAME = "JobExecutionLauncherEvent";
public static final String JOB_EXECID_META = "jobExecId";
public static final String JOB_LAUNCHED_OPERATION_TYPE = "JobLaunched";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 8181c16..ba79305 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -35,6 +35,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import kafka.message.MessageAndMetadata;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -51,7 +52,9 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
private final MutableJobCatalog jobCatalog;
+ @Getter
private Counter newSpecs;
+ @Getter
private Counter remmovedSpecs;
/**