You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/23 23:29:26 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1125] Add metrics to measure job status state store performan…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d579a  [GOBBLIN-1125] Add metrics to measure job status state store performan…
d8d579a is described below

commit d8d579a42ec901dd74b6f453bd334c77e9498195
Author: sv2000 <su...@gmail.com>
AuthorDate: Thu Apr 23 16:29:18 2020 -0700

    [GOBBLIN-1125] Add metrics to measure job status state store performan…
    
    Closes #2965 from sv2000/jobStatusMetrics
---
 .../runtime/KafkaAvroJobStatusMonitorTest.java     |  7 +++++
 .../service/monitoring/JobStatusRetriever.java     | 14 +++++++--
 .../service/monitoring/KafkaJobStatusMonitor.java  |  9 +++++-
 .../monitoring/MysqlJobStatusRetriever.java        | 33 ++++++++++++++++------
 .../monitoring/MysqlJobStatusRetrieverTest.java    | 12 ++++----
 5 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 05dc4c3..937c705 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -134,6 +134,7 @@ public class KafkaAvroJobStatusMonitorTest {
         .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
         .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
     MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
 
@@ -237,6 +238,7 @@ public class KafkaAvroJobStatusMonitorTest {
         .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
         .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
     MockKafkaAvroJobStatusMonitor jobStatusMonitor = new MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
 
     ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
 
@@ -460,5 +462,10 @@ public class KafkaAvroJobStatusMonitorTest {
     protected void processMessage(DecodeableKafkaRecord record) {
       super.processMessage(record);
     }
+
+    @Override
+    protected void buildMetricsContextAndMetrics() {
+      super.buildMetricsContextAndMetrics();
+    }
   }
 }
\ No newline at end of file
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 9e678d5..668824b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -21,22 +21,32 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.collect.Iterators;
+import com.typesafe.config.ConfigFactory;
 
+import lombok.Getter;
 
-import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Retriever for {@link JobStatus}.
  */
-@Alpha
 public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker {
   public static final String EVENT_NAME_FIELD = "eventName";
   public static final String NA_KEY = "NA";
 
+  @Getter
+  protected final MetricContext metricContext;
+
+  protected JobStatusRetriever() {
+    this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+  }
+
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
       long flowExecutionId);
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index d548177..30766d5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -24,6 +24,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -42,6 +44,7 @@ import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.service.ExecutionStatus;
@@ -60,6 +63,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
   public static final String STATE_STORE_TABLE_SUFFIX = "gst";
   public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
+  public static final String GET_AND_SET_JOB_STATUS = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+      JOB_STATUS_MONITOR_PREFIX,  "getAndSetJobStatus");
 
   static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
   static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
@@ -118,7 +123,9 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     try {
       org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.getValue());
       if (jobStatus != null) {
-        addJobStatusToStateStore(jobStatus, this.stateStore);
+        try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
+          addJobStatusToStateStore(jobStatus, this.stateStore);
+        }
       }
     } catch (IOException ioe) {
       String messageStr = new String(message.getValue(), Charsets.UTF_8);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 56289bd..a7a760c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -23,7 +23,8 @@ import java.util.List;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
-import com.google.common.base.Joiner;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.typesafe.config.Config;
@@ -33,21 +34,27 @@ import lombok.Getter;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.service.ExecutionStatus;
 
 
 /**
  * Mysql based Retriever for {@link JobStatus}.
  */
 public class MysqlJobStatusRetriever extends JobStatusRetriever {
+  public static final String MYSQL_JOB_STATUS_RETRIEVER_PREFIX = "mysqlJobStatusRetriever";
+  public static final String GET_LATEST_JOB_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+      MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestJobStatus");
+  public static final String GET_LATEST_FLOW_STATUS_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+      MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getLatestFlowStatus");
+  public static final String GET_ALL_FLOW_STATUSES_METRIC = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+      MYSQL_JOB_STATUS_RETRIEVER_PREFIX, "getAllFlowStatuses");
 
-  public static final String CONF_PREFIX = "mysqlJobStatusRetriever";
   @Getter
   private MysqlJobStatusStateStore<State> stateStore;
 
   public MysqlJobStatusRetriever(Config config) throws ReflectiveOperationException {
-    config = config.getConfig(CONF_PREFIX).withFallback(config);
+    config = config.getConfig(MYSQL_JOB_STATUS_RETRIEVER_PREFIX).withFallback(config);
     this.stateStore = (MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
   }
 
@@ -55,7 +62,10 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
   public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) {
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
     try {
-      List<State> jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
+      List<State> jobStatusStates;
+      try (Timer.Context context = this.metricContext.contextAwareTimer(GET_LATEST_FLOW_STATUS_METRIC).time()) {
+        jobStatusStates = this.stateStore.getAll(storeName, flowExecutionId);
+      }
       return getJobStatuses(jobStatusStates);
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -69,7 +79,10 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
     String tableName = KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, jobName);
 
     try {
-      List<State> jobStatusStates = this.stateStore.getAll(storeName, tableName);
+      List<State> jobStatusStates;
+      try (Timer.Context context = this.metricContext.contextAwareTimer(GET_LATEST_JOB_STATUS_METRIC).time()) {
+        jobStatusStates = this.stateStore.getAll(storeName, tableName);
+      }
       return getJobStatuses(jobStatusStates);
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -81,9 +94,11 @@ public class MysqlJobStatusRetriever extends JobStatusRetriever {
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
 
     try {
-      List<State> jobStatusStates = this.stateStore.getAll(storeName);
-      List<Long> flowExecutionIds = jobStatusStates.stream()
-          .map(state -> Long.parseLong(state.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))
+      List<State> jobStatusStates;
+      try (Timer.Context context = this.metricContext.contextAwareTimer(GET_ALL_FLOW_STATUSES_METRIC).time()) {
+        jobStatusStates = this.stateStore.getAll(storeName);
+      }
+      List<Long> flowExecutionIds = jobStatusStates.stream().map(state -> Long.parseLong(state.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))
           .collect(Collectors.toList());
       return ImmutableList.copyOf(Iterables.limit(new TreeSet<>(flowExecutionIds).descendingSet(), count));
     } catch (IOException e) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 48fd157..88e60f5 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -18,13 +18,15 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
 
 public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -39,9 +41,9 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
     String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
 
     ConfigBuilder configBuilder = ConfigBuilder.create();
-    configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
-    configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
-    configBuilder.addPrimitive(MysqlJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
+    configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
+    configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
+    configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
 
     this.jobStatusRetriever = new MysqlJobStatusRetriever(configBuilder.build());
     this.dbJobStateStore = ((MysqlJobStatusRetriever) this.jobStatusRetriever).getStateStore();