You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/01 20:12:49 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1168] add metrics in all SpecStore implementations

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f84a01  [GOBBLIN-1168] add metrics in all SpecStore implementations
3f84a01 is described below

commit 3f84a013debd8abcb5af13eca51e8b803dc8e578
Author: Arjun <ab...@linkedin.com>
AuthorDate: Mon Jun 1 13:12:35 2020 -0700

    [GOBBLIN-1168] add metrics in all SpecStore implementations
    
    Closes #3001 from arjun4084346/flowSpecFields2
---
 .../service/FlowConfigResourceLocalHandler.java    |  49 +------
 gobblin-runtime/build.gradle                       |   2 +
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |  49 +++++++
 .../gobblin/runtime/api/InstrumentedSpecStore.java | 162 +++++++++++++++++++++
 .../gobblin/runtime/spec_store/FSSpecStore.java    |  48 +++---
 .../gobblin/runtime/spec_store/MysqlSpecStore.java |  61 ++++++--
 .../runtime/spec_store/MysqlSpecStoreTest.java     |  85 ++++++++---
 ...ControllerUserDefinedMessageHandlerFactory.java |   2 +-
 .../service/modules/orchestration/DagManager.java  |   6 +-
 .../modules/orchestration/DagManagerFlowTest.java  |   8 +-
 10 files changed, 357 insertions(+), 115 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 49e5a72..eaa62a5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -81,7 +81,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     log.info("[GAAS-REST] Get called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
 
     try {
-      URI flowUri = FlowUriUtils.createFlowSpecUri(flowId);
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
       FlowSpec spec = (FlowSpec) flowCatalog.getSpec(flowUri);
       FlowConfig flowConfig = new FlowConfig();
       Properties flowProps = spec.getConfigAsProperties();
@@ -207,7 +207,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     URI flowUri = null;
 
     try {
-      flowUri = FlowUriUtils.createFlowSpecUri(flowId);
+      flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
       this.flowCatalog.remove(flowUri, header, triggerListener);
       return new UpdateResponse(HttpStatus.S_200_OK);
     } catch (URISyntaxException e) {
@@ -282,49 +282,4 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getTemplateUris(), e);
     }
   }
-
-  public static class FlowUriUtils {
-    private final static String URI_SCHEME = "gobblin-flow";
-    private final static String URI_AUTHORITY = null;
-    private final static String URI_PATH_SEPARATOR = "/";
-    private final static String URI_QUERY = null;
-    private final static String URI_FRAGMENT = null;
-    private final static int EXPECTED_NUM_URI_PATH_TOKENS = 3;
-
-    public static URI createFlowSpecUri(FlowId flowId) throws URISyntaxException {
-      return new URI(URI_SCHEME, URI_AUTHORITY, createUriPath(flowId), URI_QUERY, URI_FRAGMENT);
-    }
-
-    private static String createUriPath(FlowId flowId) {
-      return URI_PATH_SEPARATOR + flowId.getFlowGroup() + URI_PATH_SEPARATOR + flowId.getFlowName();
-    }
-
-    /**
-     * returns the flow name from the flowUri
-     * @param flowUri FlowUri
-     * @return null if the provided flowUri is not valid
-     */
-    public static String getFlowName(URI flowUri) {
-      String[] uriTokens = flowUri.getPath().split("/");
-      if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
-        log.error("Invalid URI {}.", flowUri);
-        return null;
-      }
-      return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 1];
-    }
-
-    /**
-     * returns the flow group from the flowUri
-     * @param flowUri FlowUri
-     * @return null if the provided flowUri is not valid
-     */
-    public static String getFlowGroup(URI flowUri) {
-      String[] uriTokens = flowUri.getPath().split("/");
-      if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
-        log.error("Invalid URI {}.", flowUri);
-        return null;
-      }
-      return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 2];
-    }
-  }
 }
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 306036c..8b0e4e9 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -44,6 +44,8 @@ dependencies {
   compile project(":gobblin-utility")
   compile project(path: ':gobblin-rest-service:gobblin-rest-api', configuration: 'restClient')
   compile project(path: ':gobblin-rest-service:gobblin-rest-api', configuration: 'dataTemplate')
+  compile project(path: ':gobblin-restli:gobblin-flow-config-service:gobblin-flow-config-service-api', configuration: 'restClient')
+  compile project(path: ':gobblin-restli:gobblin-flow-config-service:gobblin-flow-config-service-api', configuration: 'dataTemplate')
   compile project(":gobblin-rest-service:gobblin-rest-server")
   compile project(":gobblin-modules:google-ingestion")
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 8721e41..fb27380 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -34,8 +34,11 @@ import java.util.Properties;
 import java.util.Set;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -358,4 +361,50 @@ public class FlowSpec implements Configurable, Spec {
   public Boolean isExplain() {
     return ConfigUtils.getBoolean(getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
   }
+
+  @Slf4j
+  public static class Utils {
+    private final static String URI_SCHEME = "gobblin-flow";
+    private final static String URI_AUTHORITY = null;
+    private final static String URI_PATH_SEPARATOR = "/";
+    private final static String URI_QUERY = null;
+    private final static String URI_FRAGMENT = null;
+    private final static int EXPECTED_NUM_URI_PATH_TOKENS = 3;
+
+    public static URI createFlowSpecUri(FlowId flowId) throws URISyntaxException {
+      return new URI(URI_SCHEME, URI_AUTHORITY, createUriPath(flowId), URI_QUERY, URI_FRAGMENT);
+    }
+
+    private static String createUriPath(FlowId flowId) {
+      return URI_PATH_SEPARATOR + flowId.getFlowGroup() + URI_PATH_SEPARATOR + flowId.getFlowName();
+    }
+
+    /**
+     * returns the flow name from the flowUri
+     * @param flowUri FlowUri
+     * @return null if the provided flowUri is not valid
+     */
+    public static String getFlowName(URI flowUri) {
+      String[] uriTokens = flowUri.getPath().split("/");
+      if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
+        log.error("Invalid URI {}.", flowUri);
+        return null;
+      }
+      return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 1];
+    }
+
+    /**
+     * returns the flow group from the flowUri
+     * @param flowUri FlowUri
+     * @return null if the provided flowUri is not valid
+     */
+    public static String getFlowGroup(URI flowUri) {
+      String[] uriTokens = flowUri.getPath().split("/");
+      if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
+        log.error("Invalid URI {}.", flowUri);
+        return null;
+      }
+      return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 2];
+    }
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
new file mode 100644
index 0000000..442fdc9
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Instrumented version of {@link SpecStore} automatically capturing certain metrics.
+ * Subclasses should implement addSpecImpl instead of addSpec and so on.
+ */
+public abstract class InstrumentedSpecStore implements SpecStore {
+  private Optional<Timer> getTimer;
+  private Optional<Timer> existsTimer;
+  private Optional<Timer> deleteTimer;
+  private Optional<Timer> addTimer;
+  private Optional<Timer> updateTimer;
+  private Optional<Timer> getAllTimer;
+  private Optional<Timer> getURIsTimer;
+  private MetricContext metricContext;
+  private final boolean instrumentationEnabled;
+
+  public InstrumentedSpecStore(Config config, SpecSerDe specSerDe) {
+    this.instrumentationEnabled = GobblinMetrics.isEnabled(new State(ConfigUtils.configToProperties(config)));
+    this.metricContext = Instrumented.getMetricContext(new State(), getClass());
+    this.getTimer = createTimer("-GET");
+    this.existsTimer = createTimer("-EXISTS");
+    this.deleteTimer = createTimer("-DELETE");
+    this.addTimer = createTimer("-ADD");
+    this.updateTimer = createTimer("-UPDATE");
+    this.getAllTimer = createTimer("-GETALL");
+    this.getURIsTimer = createTimer("-GETURIS");
+  }
+
+  private Optional<Timer> createTimer(String suffix) {
+    return instrumentationEnabled
+        ? Optional.of(this.metricContext.timer(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,getClass().getSimpleName(), suffix)))
+        : Optional.absent();
+  }
+
+  @Override
+  public boolean exists(URI specUri) throws IOException {
+    if (!instrumentationEnabled) {
+      return existsImpl(specUri);
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      boolean ret = existsImpl(specUri);
+      Instrumented.updateTimer(this.existsTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+      return ret;
+    }
+  }
+
+  @Override
+  public void addSpec(Spec spec) throws IOException {
+    if (!instrumentationEnabled) {
+      addSpecImpl(spec);
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      addSpecImpl(spec);
+      Instrumented.updateTimer(this.addTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  @Override
+  public boolean deleteSpec(URI specUri) throws IOException {
+    if (!instrumentationEnabled) {
+      return deleteSpecImpl(specUri);
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      boolean ret = deleteSpecImpl(specUri);
+      Instrumented.updateTimer(this.deleteTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+      return ret;
+    }
+  }
+
+  @Override
+  public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
+    if (!instrumentationEnabled) {
+      return getSpecImpl(specUri);
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      Spec spec = getSpecImpl(specUri);
+      Instrumented.updateTimer(this.getTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+      return spec;
+    }
+  }
+
+  @Override
+  public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
+    if (!instrumentationEnabled) {
+      return updateSpecImpl(spec);
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      Spec ret = updateSpecImpl(spec);
+      Instrumented.updateTimer(this.updateTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+      return ret;
+    }
+  }
+
+  @Override
+  public Collection<Spec> getSpecs() throws IOException {
+    if (!instrumentationEnabled) {
+      return getSpecsImpl();
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      Collection<Spec> spec = getSpecsImpl();
+      Instrumented.updateTimer(this.getAllTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+      return spec;
+    }
+  }
+
+  @Override
+  public Iterator<URI> getSpecURIs() throws IOException {
+    if (!instrumentationEnabled) {
+      return getSpecURIsImpl();
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      Iterator<URI> specURIs = getSpecURIsImpl();
+      Instrumented.updateTimer(this.getURIsTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+      return specURIs;
+    }
+  }
+
+  public abstract void addSpecImpl(Spec spec) throws IOException;
+  public abstract Spec updateSpecImpl(Spec spec) throws IOException, SpecNotFoundException;
+  public abstract boolean existsImpl(URI specUri) throws IOException;
+  public abstract Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException;
+  public abstract boolean deleteSpecImpl(URI specUri) throws IOException;
+  public abstract Collection<Spec> getSpecsImpl() throws IOException;
+  public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index feee4a9..14a3069 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -17,23 +17,11 @@
 
 package org.apache.gobblin.runtime.spec_store;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.SpecSerDe;
-import org.apache.gobblin.runtime.api.SpecStore;
-import org.apache.gobblin.util.PathUtils;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -45,6 +33,21 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.util.PathUtils;
+
 
 /**
  * The Spec Store for file system to persist the Spec information.
@@ -53,7 +56,7 @@ import org.slf4j.LoggerFactory;
  * 2. This implementation does not performs implicit version management.
  *    For implicit version management, please use a wrapper FSSpecStore.
  */
-public class FSSpecStore implements SpecStore {
+public class FSSpecStore extends InstrumentedSpecStore {
 
   /***
    * Configuration properties related to Spec Store
@@ -83,6 +86,7 @@ public class FSSpecStore implements SpecStore {
 
   public FSSpecStore(Config sysConfig, SpecSerDe specSerDe, Optional<Logger> log)
       throws IOException {
+    super(sysConfig, specSerDe);
     Preconditions.checkArgument(sysConfig.hasPath(SPECSTORE_FS_DIR_KEY),
         "FS SpecStore path must be specified.");
 
@@ -141,7 +145,7 @@ public class FSSpecStore implements SpecStore {
   }
 
   @Override
-  public boolean exists(URI specUri) throws IOException {
+  public boolean existsImpl(URI specUri) throws IOException {
     Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
 
     Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, FlowSpec.Builder.DEFAULT_VERSION);
@@ -149,7 +153,7 @@ public class FSSpecStore implements SpecStore {
   }
 
   @Override
-  public void addSpec(Spec spec) throws IOException {
+  public void addSpecImpl(Spec spec) throws IOException {
     Preconditions.checkArgument(null != spec, "Spec should not be null");
 
     log.info(String.format("Adding Spec with URI: %s in FSSpecStore: %s", spec.getUri(), this.fsSpecStoreDirPath));
@@ -165,10 +169,10 @@ public class FSSpecStore implements SpecStore {
   }
 
   @Override
-  public boolean deleteSpec(URI specUri) throws IOException {
+  public boolean deleteSpecImpl(URI specUri) throws IOException {
     Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
 
-      return deleteSpec(specUri, FlowSpec.Builder.DEFAULT_VERSION);
+    return deleteSpec(specUri, FlowSpec.Builder.DEFAULT_VERSION);
   }
 
   @Override
@@ -186,13 +190,13 @@ public class FSSpecStore implements SpecStore {
   }
 
   @Override
-  public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
+  public Spec updateSpecImpl(Spec spec) throws IOException, SpecNotFoundException {
     addSpec(spec);
     return spec;
   }
 
   @Override
-  public Spec getSpec(URI specUri) throws SpecNotFoundException {
+  public Spec getSpecImpl(URI specUri) throws SpecNotFoundException {
     Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
 
     Collection<Spec> specs = getAllVersionsOfSpec(specUri);
@@ -228,7 +232,7 @@ public class FSSpecStore implements SpecStore {
   }
 
   @Override
-  public Collection<Spec> getSpecs() throws IOException {
+  public Collection<Spec> getSpecsImpl() throws IOException {
     Collection<Spec> specs = Lists.newArrayList();
     try {
       getSpecs(this.fsSpecStoreDirPath, specs);
@@ -240,7 +244,7 @@ public class FSSpecStore implements SpecStore {
   }
 
   @Override
-  public Iterator<URI> getSpecURIs() throws IOException {
+  public Iterator<URI> getSpecURIsImpl() throws IOException {
     final RemoteIterator<LocatedFileStatus> it = fs.listFiles(this.fsSpecStoreDirPath, true);
     return new Iterator<URI>() {
       @Override
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 53a2b7e..a5a0d2b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -33,6 +33,7 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
+import com.google.gson.Gson;
 import com.typesafe.config.Config;
 
 import javax.sql.DataSource;
@@ -41,6 +42,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSerDe;
@@ -48,6 +51,9 @@ import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.api.SpecStore;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
+
 
 /**
  * Implementation of {@link SpecStore} that stores specs as serialized java objects in MySQL. Note that versions are not
@@ -59,7 +65,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * but not removing it from {@link SpecStore}.
  */
 @Slf4j
-public class MysqlSpecStore implements SpecStore {
+public class MysqlSpecStore extends InstrumentedSpecStore {
   public static final String CONFIG_PREFIX = "mysqlSpecStore";
   public static final String DEFAULT_TAG_VALUE = "";
   private static final String NEW_COLUMN = "spec_json";
@@ -72,8 +78,9 @@ public class MysqlSpecStore implements SpecStore {
           + "isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), "
           + "spec LONGBLOB, " + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
   private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
-  protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec, " + NEW_COLUMN + ") "
-      + "VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
+  protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
+      + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, spec, " + NEW_COLUMN + ") "
+      + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
   private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
   private static final String GET_STATEMENT = "SELECT spec, " + NEW_COLUMN + " FROM %s WHERE spec_uri = ?";
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
@@ -86,6 +93,7 @@ public class MysqlSpecStore implements SpecStore {
   protected final SpecSerDe specSerDe;
 
   public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
+    super(config, specSerDe);
     if (config.hasPath(CONFIG_PREFIX)) {
       config = config.getConfig(CONFIG_PREFIX).withFallback(config);
     }
@@ -104,7 +112,7 @@ public class MysqlSpecStore implements SpecStore {
   }
 
   @Override
-  public boolean exists(URI specUri) throws IOException {
+  public boolean existsImpl(URI specUri) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(EXISTS_STATEMENT, this.tableName))) {
       statement.setString(1, specUri.toString());
@@ -118,7 +126,7 @@ public class MysqlSpecStore implements SpecStore {
   }
 
   @Override
-  public void addSpec(Spec spec) throws IOException {
+  public void addSpecImpl(Spec spec) throws IOException {
     this.addSpec(spec, DEFAULT_TAG_VALUE);
   }
 
@@ -128,10 +136,7 @@ public class MysqlSpecStore implements SpecStore {
   public void addSpec(Spec spec, String tagValue) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
-      statement.setString(1, spec.getUri().toString());
-      statement.setString(2, tagValue);
-      statement.setBlob(3, new ByteArrayInputStream(this.specSerDe.serialize(spec)));
-      statement.setString(4, new String(this.specSerDe.serialize(spec), Charsets.UTF_8));
+      setPreparedStatement(statement, spec, tagValue);
       statement.executeUpdate();
       connection.commit();
     } catch (SQLException | SpecSerDeException e) {
@@ -145,7 +150,7 @@ public class MysqlSpecStore implements SpecStore {
   }
 
   @Override
-  public boolean deleteSpec(URI specUri) throws IOException {
+  public boolean deleteSpecImpl(URI specUri) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, this.tableName))) {
       statement.setString(1, specUri.toString());
@@ -164,13 +169,13 @@ public class MysqlSpecStore implements SpecStore {
 
   @Override
   // TODO : this method is not doing what the contract is in the SpecStore interface
-  public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
+  public Spec updateSpecImpl(Spec spec) throws IOException {
     addSpec(spec);
     return spec;
   }
 
   @Override
-  public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
+  public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(GET_STATEMENT, this.tableName))) {
       statement.setString(1, specUri.toString());
@@ -199,7 +204,7 @@ public class MysqlSpecStore implements SpecStore {
   }
 
   @Override
-  public Collection<Spec> getSpecs() throws IOException {
+  public Collection<Spec> getSpecsImpl() throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
       List<Spec> specs = new ArrayList<>();
@@ -217,7 +222,6 @@ public class MysqlSpecStore implements SpecStore {
           }
         }
       }
-
       return specs;
     } catch (SQLException e) {
       throw new IOException(e);
@@ -225,7 +229,7 @@ public class MysqlSpecStore implements SpecStore {
   }
 
   @Override
-  public Iterator<URI> getSpecURIs() throws IOException {
+  public Iterator<URI> getSpecURIsImpl() throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_URIS_STATEMENT, this.tableName))) {
       return getURIIteratorByQuery(statement);
@@ -262,4 +266,31 @@ public class MysqlSpecStore implements SpecStore {
   public Optional<URI> getSpecStoreURI() {
     return Optional.of(this.specStoreURI);
   }
+
+  protected void setPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
+    FlowSpec flowSpec = (FlowSpec) spec;
+    URI specUri = flowSpec.getUri();
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    String templateURI = new Gson().toJson(flowSpec.getTemplateURIs());
+    String userToProxy = ConfigUtils.getString(flowSpec.getConfig(), "user.to.proxy", null);
+    String sourceIdentifier = flowConfig.getString(FLOW_SOURCE_IDENTIFIER_KEY);
+    String destinationIdentifier = flowConfig.getString(FLOW_DESTINATION_IDENTIFIER_KEY);
+    String schedule = ConfigUtils.getString(flowConfig, ConfigurationKeys.JOB_SCHEDULE_KEY, null);
+    boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
+
+    statement.setString(1, specUri.toString());
+    statement.setString(2, flowGroup);
+    statement.setString(3, flowName);
+    statement.setString(4, templateURI);
+    statement.setString(5, userToProxy);
+    statement.setString(6, sourceIdentifier);
+    statement.setString(7, destinationIdentifier);
+    statement.setString(8, schedule);
+    statement.setString(9, tagValue);
+    statement.setBoolean(10, isRunImmediately);
+    statement.setBlob(11, new ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
+    statement.setString(12, new String(this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
+  }
 }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 16e08b8..6762f3c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.runtime.spec_store;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -28,11 +29,14 @@ import java.util.Collection;
 import java.util.Iterator;
 
 import java.util.List;
+import java.util.Properties;
+
 import org.apache.commons.lang3.ArrayUtils;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.Iterators;
 import com.typesafe.config.Config;
 
@@ -45,6 +49,10 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe;
+import org.apache.gobblin.service.FlowId;
+
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
 
 
 public class MysqlSpecStoreTest {
@@ -54,24 +62,14 @@ public class MysqlSpecStoreTest {
 
   private MysqlSpecStore specStore;
   private MysqlSpecStore oldSpecStore;
-  private URI uri1 = URI.create("flowspec1");
-  private URI uri2 = URI.create("flowspec2");
-  private URI uri3 = URI.create("flowspec3");
-  private FlowSpec flowSpec1 = FlowSpec.builder(this.uri1)
-      .withConfig(ConfigBuilder.create().addPrimitive("key", "value").build())
-      .withDescription("Test flow spec")
-      .withVersion("Test version")
-      .build();
-  private FlowSpec flowSpec2 = FlowSpec.builder(this.uri2)
-      .withConfig(ConfigBuilder.create().addPrimitive("key2", "value2").build())
-      .withDescription("Test flow spec 2")
-      .withVersion("Test version 2")
-      .build();
-  private FlowSpec flowSpec3 = FlowSpec.builder(this.uri3)
-      .withConfig(ConfigBuilder.create().addPrimitive("key3", "value3").build())
-      .withDescription("Test flow spec 3")
-      .withVersion("Test version 3")
-      .build();
+  private URI uri1 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg1").setFlowGroup("fn1"));
+  private URI uri2 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg2").setFlowGroup("fn2"));
+  private URI uri3 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg3").setFlowGroup("fn3"));
+  private FlowSpec flowSpec1, flowSpec2, flowSpec3;
+
+  public MysqlSpecStoreTest()
+      throws URISyntaxException {
+  }
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -86,6 +84,39 @@ public class MysqlSpecStoreTest {
 
     this.specStore = new MysqlSpecStore(config, new TestSpecSerDe());
     this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
+
+    Properties properties = new Properties();
+    properties.setProperty(FLOW_SOURCE_IDENTIFIER_KEY, "source");
+    properties.setProperty(FLOW_DESTINATION_IDENTIFIER_KEY, "destination");
+
+    flowSpec1 = FlowSpec.builder(this.uri1)
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive("key", "value")
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg1")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn1").build())
+        .withDescription("Test flow spec")
+        .withVersion("Test version")
+        .build();
+    flowSpec2 = FlowSpec.builder(this.uri2)
+        .withConfig(ConfigBuilder.create().addPrimitive("key2", "value2")
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn2").build())
+        .withDescription("Test flow spec 2")
+        .withVersion("Test version 2")
+        .build();
+    flowSpec3 = FlowSpec.builder(this.uri3)
+        .withConfig(ConfigBuilder.create().addPrimitive("key3", "value3")
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg3")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn3").build())
+        .withDescription("Test flow spec 3")
+        .withVersion("Test version 3")
+        .build();
   }
 
   @Test
@@ -118,14 +149,24 @@ public class MysqlSpecStoreTest {
     //Creating and inserting flowspecs with tags
     URI uri4 = URI.create("flowspec4");
     FlowSpec flowSpec4 = FlowSpec.builder(uri4)
-        .withConfig(ConfigBuilder.create().addPrimitive("key4", "value4").build())
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+            .addPrimitive("key4", "value4").build())
         .withDescription("Test flow spec 4")
         .withVersion("Test version 4")
         .build();
 
     URI uri5 = URI.create("flowspec5");
     FlowSpec flowSpec5 = FlowSpec.builder(uri5)
-        .withConfig(ConfigBuilder.create().addPrimitive("key5", "value5").build())
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5")
+            .addPrimitive("key5", "value5").build())
         .withDescription("Test flow spec 5")
         .withVersion("Test version 5")
         .build();
@@ -173,9 +214,7 @@ public class MysqlSpecStoreTest {
     public void addSpec(Spec spec, String tagValue) throws IOException {
       try (Connection connection = this.dataSource.getConnection();
           PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
-        statement.setString(1, spec.getUri().toString());
-        statement.setString(2, tagValue);
-        statement.setBlob(3, new ByteArrayInputStream(this.specSerDe.serialize(spec)));
+        setPreparedStatement(statement, spec, tagValue);
         statement.setString(4, null);
         statement.executeUpdate();
         connection.commit();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
index 6181ff9..08a9cb5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
@@ -152,7 +152,7 @@ class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactor
         FlowId id = FlowConfigUtils.deserializeFlowId(msg);
         if (flowCatalogLocalCommit) {
           // in balance mode, flow spec is already deleted in flow catalog on standby node.
-          URI flowUri = FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(id);
+          URI flowUri = FlowSpec.Utils.createFlowSpecUri(id);
           log.info("Only handle update {} scheduling because flow catalog is committed locally on standby.", flowUri);
           jobScheduler.onDeleteSpec(flowUri, FlowSpec.Builder.DEFAULT_VERSION);
         } else {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index e2a11b6..f323ee3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -62,12 +62,12 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
 import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -284,8 +284,8 @@ public class DagManager extends AbstractIdleService {
    * The {@link DagManager} adds the dag to the {@link BlockingQueue} to be picked up by one of the {@link DagManagerThread}s.
    */
   synchronized public void stopDag(URI uri) throws IOException {
-    String flowGroup = FlowConfigResourceLocalHandler.FlowUriUtils.getFlowGroup(uri);
-    String flowName = FlowConfigResourceLocalHandler.FlowUriUtils.getFlowName(uri);
+    String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+    String flowName = FlowSpec.Utils.getFlowName(uri);
 
     List<Long> flowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
     log.info("Found {} flows to cancel.", flowExecutionIds.size());
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index edc881f..85b8bca 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -37,9 +37,9 @@ import com.typesafe.config.ConfigValueFactory;
 import javax.annotation.Nullable;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -103,9 +103,9 @@ public class DagManagerFlowTest {
         assertTrue(input -> dagManager.dagManagerThreads[queue3].dagToJobs.containsKey(dagId3), ERROR_MESSAGE);
 
     // mock delete spec
-    dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group0").setFlowName("flow0")));
-    dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group1").setFlowName("flow1")));
-    dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group2").setFlowName("flow2")));
+    dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group0").setFlowName("flow0")));
+    dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group1").setFlowName("flow1")));
+    dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group2").setFlowName("flow2")));
 
     // verify deleteSpec() of specProducer is called once
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag1), ERROR_MESSAGE);