You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/23 23:29:26 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1125] Add metrics to measure job status state store performan…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d8d579a [GOBBLIN-1125] Add metrics to measure job status state store performan…
d8d579a is described below
commit d8d579a42ec901dd74b6f453bd334c77e9498195
Author: sv2000 <su...@gmail.com>
AuthorDate: Thu Apr 23 16:29:18 2020 -0700
[GOBBLIN-1125] Add metrics to measure job status state store performan…
Closes #2965 from sv2000/jobStatusMetrics
---
.../runtime/KafkaAvroJobStatusMonitorTest.java | 7 +++++
.../service/monitoring/JobStatusRetriever.java | 14 +++++++--
.../service/monitoring/KafkaJobStatusMonitor.java | 9 +++++-
.../monitoring/MysqlJobStatusRetriever.java | 33 ++++++++++++++++------
.../monitoring/MysqlJobStatusRetrieverTest.java | 12 ++++----
5 files changed, 58 insertions(+), 17 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 05dc4c3..937c705 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -134,6 +134,7 @@ public class KafkaAvroJobStatusMonitorTest {
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
.withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -237,6 +238,7 @@ public class KafkaAvroJobStatusMonitorTest {
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
.withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
@@ -460,5 +462,10 @@ public class KafkaAvroJobStatusMonitorTest {
protected void processMessage(DecodeableKafkaRecord record) {
super.processMessage(record);
}
+
+ @Override
+ protected void buildMetricsContextAndMetrics() {
+ super.buildMetricsContextAndMetrics();
+ }
}
}
\ No newline at end of file
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 9e678d5..668824b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -21,22 +21,32 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.collect.Iterators;
+import com.typesafe.config.ConfigFactory;
+import lombok.Getter;
-import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.util.ConfigUtils;
/**
* Retriever for {@link JobStatus}.
*/
-@Alpha
public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker {
public static final String EVENT_NAME_FIELD = "eventName";
public static final String NA_KEY = "NA";
+ @Getter
+ protected final MetricContext metricContext;
+
+ protected JobStatusRetriever() {
+ this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+ }
+
public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
long flowExecutionId);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index d548177..30766d5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -24,6 +24,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
@@ -42,6 +44,7 @@ import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.service.ExecutionStatus;
@@ -60,6 +63,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
//gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
public static final String STATE_STORE_TABLE_SUFFIX = "gst";
public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
+ public static final String GET_AND_SET_JOB_STATUS = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ JOB_STATUS_MONITOR_PREFIX, "getAndSetJobStatus");
static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
@@ -118,7 +123,9 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
try {
org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.getValue());
if (jobStatus != null) {
- addJobStatusToStateStore(jobStatus, this.stateStore);
+ try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
+ addJobStatusToStateStore(jobStatus, this.stateStore);
+ }
}
} catch (IOException ioe) {
String messageStr = new String(message.getValue(), Charsets.UTF_8);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 56289bd..a7a760c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -23,7 +23,8 @@ import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
-import com.google.common.base.Joiner;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.typesafe.config.Config;
@@ -33,21 +34,27 @@ import lombok.Getter;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.service.ExecutionStatus;
/**
* Mysql based Retriever for {@link JobStatus}.
*/
public class MysqlJobStatusRetriever extends JobStatusRetriever {
+ public static final String MYSQL_JOB_STATUS_RETRIEVER_PREFIX = "mysqlJobStatusRetriever";
+ public static final String GET_LATEST_JOB_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestJobStatus");
+ public static final String GET_LATEST_FLOW_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestFlowStatus");
+ public static final String GET_ALL_FLOW_STATUSES_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getAllFlowStatuses");
- public static final String CONF_PREFIX = "mysqlJobStatusRetriever";
@Getter
private MysqlJobStatusStateStore<State> stateStore;
public MysqlJobStatusRetriever(Config config) throws ReflectiveOperationException {
- config = config.getConfig(CONF_PREFIX).withFallback(config);
+ config = config.getConfig(MYSQL_JOB_STATUS_RETRIEVER_PREFIX).withFallback(config);
this.stateStore = (MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
}
@@ -55,7 +62,10 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
try {
- List<State> jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
+ List<State> jobStatusStates;
+ try (Timer.Context context = this.metricContext.contextAwareTimer(GET_LATEST_FLOW_STATUS_METRIC).time()) {
+ jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
+ }
return getJobStatuses(jobStatusStates);
} catch (IOException e) {
throw new RuntimeException(e);
@@ -69,7 +79,10 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
try {
- List<State> jobStatusStates = this.stateStore.getAll(storeName, tableName);
+ List<State> jobStatusStates;
+ try (Timer.Context context = this.metricContext.contextAwareTimer(GET_LATEST_JOB_STATUS_METRIC).time()) {
+ jobStatusStates = this.stateStore.getAll(storeName, tableName);
+ }
return getJobStatuses(jobStatusStates);
} catch (IOException e) {
throw new RuntimeException(e);
@@ -81,9 +94,11 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
try {
- List<State> jobStatusStates = this.stateStore.getAll(storeName);
- List<Long> flowExecutionIds = jobStatusStates.stream()
- .map(state -> Long.parseLong(state.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))
+ List<State> jobStatusStates;
+ try (Timer.Context context = this.metricContext.contextAwareTimer(GET_ALL_FLOW_STATUSES_METRIC).time()) {
+ jobStatusStates = this.stateStore.getAll(storeName);
+ }
+ List<Long> flowExecutionIds = jobStatusStates.stream().map(state -> Long.parseLong(state.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))
.collect(Collectors.toList());
return ImmutableList.copyOf(Iterables.limit(new TreeSet<>(flowExecutionIds).descendingSet(), count));
} catch (IOException e) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 48fd157..88e60f5 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -18,13 +18,15 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -39,9 +41,9 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
ConfigBuilder configBuilder = ConfigBuilder.create();
- configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
- configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
- configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
+ configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
+ configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
+ configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
this.jobStatusRetriever = new MysqlJobStatusRetriever(configBuilder.build());
this.dbJobStateStore = ((MysqlJobStatusRetriever) this.jobStatusRetriever).getStateStore();