You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/03 23:15:07 UTC

[gobblin] branch master updated: [GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (#3421)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b1aed5c  [GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (#3421)
b1aed5c is described below

commit b1aed5c865734b4aaab5acec6ed8c78bd7ada886
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Wed Nov 3 16:15:00 2021 -0700

    [GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (#3421)
    
    * Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage
    
    * Streamline `JobSpecDeserializer` error handling, on review feedback.
    
    * Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.
    
    * Fix javadoc slipup
---
 .../runtime/job_catalog/ImmutableFSJobCatalog.java |   1 -
 .../runtime/job_catalog/MysqlJobCatalog.java       | 215 +++++++++++++++++++++
 ...lowSpecSerDe.java => GenericGsonSpecSerDe.java} |  25 +--
 .../runtime/spec_serde/GsonFlowSpecSerDe.java      |  35 +---
 .../runtime/spec_serde/GsonJobSpecSerDe.java       |  33 ++++
 .../runtime/spec_serde/JobSpecDeserializer.java    |  77 ++++++++
 .../runtime/spec_serde/JobSpecSerializer.java      |  47 +++++
 .../runtime/job_catalog/TestMysqlJobCatalog.java   | 212 ++++++++++++++++++++
 8 files changed, 601 insertions(+), 44 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
index 86e2b96..7ad4f11 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
@@ -208,7 +208,6 @@ public class ImmutableFSJobCatalog extends JobCatalogBase implements JobCatalog
 
   /**
    * Fetch single job file based on its URI,
-   * return null requested URI not existed
    * @param uri The relative Path to the target job configuration.
    * @return
    */
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java
new file mode 100644
index 0000000..14a6829
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.job_catalog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_serde.GsonJobSpecSerDe;
+import org.apache.gobblin.runtime.spec_store.MysqlBaseSpecStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MySQL-backed Job Catalog for persisting {@link JobSpec} job configuration information, which fully supports (mutation)
+ * listeners and metrics.
+ */
+public class MysqlJobCatalog extends JobCatalogBase implements MutableJobCatalog {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MysqlJobCatalog.class);
+  public static final String DB_CONFIG_PREFIX = "mysqlJobCatalog";
+
+  protected final MutableJobCatalog.MutableStandardMetrics mutableMetrics;
+  protected final MysqlBaseSpecStore jobSpecStore;
+
+  /**
+   * Initialize, with DB config contextualized by `DB_CONFIG_PREFIX`.
+   */
+  public MysqlJobCatalog(Config sysConfig)
+      throws IOException {
+      this(sysConfig, Optional.<MetricContext>absent(), GobblinMetrics.isEnabled(sysConfig));
+  }
+
+  public MysqlJobCatalog(GobblinInstanceEnvironment env) throws IOException {
+    super(env);
+    this.mutableMetrics = (MutableJobCatalog.MutableStandardMetrics) metrics;
+    this.jobSpecStore = createJobSpecStore(env.getSysConfig().getConfig());
+  }
+
+  public MysqlJobCatalog(Config sysConfig, Optional<MetricContext> parentMetricContext,
+      boolean instrumentationEnabled) throws IOException {
+    super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled, Optional.of(sysConfig));
+    this.mutableMetrics = (MutableJobCatalog.MutableStandardMetrics) metrics;
+    this.jobSpecStore = createJobSpecStore(sysConfig);
+  }
+
+  @Override
+  protected JobCatalog.StandardMetrics createStandardMetrics(Optional<Config> sysConfig) {
+    log.info("create standard metrics {} for {}", MutableJobCatalog.MutableStandardMetrics.class.getName(), this.getClass().getName());
+    return new MutableJobCatalog.MutableStandardMetrics(this, sysConfig);
+  }
+
+  protected MysqlBaseSpecStore createJobSpecStore(Config sysConfig) {
+    try {
+      return new MysqlBaseSpecStore(sysConfig, new GsonJobSpecSerDe()) {
+        @Override
+        protected String getConfigPrefix() {
+          return MysqlJobCatalog.DB_CONFIG_PREFIX;
+        }
+      };
+    } catch (IOException e) {
+      throw new RuntimeException("unable to create `JobSpec` store", e);
+    }
+  }
+
+  /** @return all {@link JobSpec}s */
+  @Override
+  public List<JobSpec> getJobs() {
+    try {
+      return (List) jobSpecStore.getSpecs();
+    } catch (IOException e) {
+      throw new RuntimeException("error getting (all) job specs", e);
+    }
+  }
+
+  /**
+   * Obtain an iterator to fetch all job specifications. Unlike {@link #getJobs()}, this method avoids loading
+   * all job configs into memory in the very beginning.
+   * Interleaving notes: jobs added/modified/deleted between `Iterator` creation and exhaustion MAY or MAY NOT be reflected.
+   *
+   * @return an iterator for (all) present {@link JobSpec}s
+   */
+  @Override
+  public Iterator<JobSpec> getJobSpecIterator() {
+    try {
+      return Iterators.<Optional<JobSpec>, JobSpec>transform(
+          Iterators.filter(
+              Iterators.transform(jobSpecStore.getSpecURIs(), uri -> {
+                try {
+                  return Optional.of(MysqlJobCatalog.this.getJobSpec(uri));
+                } catch (JobSpecNotFoundException e) {
+                  MysqlJobCatalog.this.log.info("unable to retrieve previously identified JobSpec by URI '{}'", uri);
+                  return Optional.absent();
+                }}),
+              Optional::isPresent),
+          Optional::get);
+    } catch (IOException e) {
+      throw new RuntimeException("error iterating (all) job specs", e);
+    }
+  }
+
+  /**
+   * Fetch single {@link JobSpec} by URI.
+   * @return the `JobSpec`
+   * @throws {@link JobSpecNotFoundException}
+   */
+  @Override
+  public JobSpec getJobSpec(URI uri)
+      throws JobSpecNotFoundException {
+    Preconditions.checkNotNull(uri);
+    try {
+      return (JobSpec) jobSpecStore.getSpec(uri);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format("error accessing job spec '%s'", uri), e);
+    } catch (SpecNotFoundException e) {
+      throw new JobSpecNotFoundException(uri);
+    }
+  }
+
+  /**
+   * Add or update (when an existing) {@link JobSpec}, triggering the appropriate
+   * {@link org.apache.gobblin.runtime.api.JobCatalogListener} callback.
+   *
+   * NOTE: `synchronized` (w/ `remove()`) for integrity of (existence) check-then-update.
+   */
+  @Override
+  public synchronized void put(JobSpec jobSpec) {
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(jobSpec);
+    try {
+      long startTime = System.currentTimeMillis();
+      boolean isUpdate = jobSpecStore.exists(jobSpec.getUri());
+      if (isUpdate) {
+        try {
+          jobSpecStore.updateSpec(jobSpec);
+          this.mutableMetrics.updatePutJobTime(startTime);
+          this.listeners.onUpdateJob(jobSpec);
+        } catch (SpecNotFoundException e) { // should never happen (since `synchronized`)
+          throw new RuntimeException(String.format("error finding spec to update '%s'", jobSpec.getUri()), e);
+        }
+      } else {
+        jobSpecStore.addSpec(jobSpec);
+        this.mutableMetrics.updatePutJobTime(startTime);
+        this.listeners.onAddJob(jobSpec);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(String.format("error updating or adding JobSpec '%s'", jobSpec.getUri()), e);
+    }
+  }
+
+  /**
+   * Delete (an existing) {@link JobSpec}, triggering the appropriate {@link org.apache.gobblin.runtime.api.JobCatalogListener} callback.
+   *
+   * NOTE: `synchronized` w/ `put()` to protect its check-then-update.
+   */
+  @Override
+  public synchronized void remove(URI jobURI) {
+    remove(jobURI, false);
+  }
+
+  /**
+   * NOTE: `synchronized` w/ `put()` to protect its check-then-update.
+   *
+   * @param alwaysTriggerListeners whether invariably to trigger {@link org.apache.gobblin.runtime.api.JobCatalogListener#onCancelJob(URI)}
+   */
+  @Override
+  public synchronized void remove(URI jobURI, boolean alwaysTriggerListeners) {
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(jobURI);
+    try {
+      long startTime = System.currentTimeMillis();
+      JobSpec jobSpec = (JobSpec) jobSpecStore.getSpec(jobURI);
+      jobSpecStore.deleteSpec(jobURI);
+      this.mutableMetrics.updateRemoveJobTime(startTime);
+      this.listeners.onDeleteJob(jobURI, jobSpec.getVersion());
+    } catch (SpecNotFoundException e) {
+      LOGGER.warn("Unknown job spec URI: '" + jobURI + "'.  Deletion failed.");
+    } catch (IOException e) {
+      throw new RuntimeException("When removing a JobConf. file, issues unexpected happen:" + e.getMessage());
+    } finally {
+      // HelixRetriggeringJobCallable deletes the job file after submitting it to helix,
+      // so trigger listeners regardless of its existence.
+      if (alwaysTriggerListeners) {
+        this.listeners.onCancelJob(jobURI);
+      }
+    }
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GenericGsonSpecSerDe.java
similarity index 62%
copy from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
copy to gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GenericGsonSpecSerDe.java
index dd8d20d..7fe083a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GenericGsonSpecSerDe.java
@@ -19,33 +19,36 @@ package org.apache.gobblin.runtime.spec_serde;
 
 import com.google.common.base.Charsets;
 import com.google.gson.Gson;
+import com.google.gson.JsonDeserializer;
 import com.google.gson.JsonParseException;
-import com.google.gson.reflect.TypeToken;
-
-import org.apache.gobblin.runtime.api.FlowSpec;
+import com.google.gson.JsonSerializer;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
 
 
 /**
- * {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that currently only {@link FlowSpec}s are supported.
+ * Abstract {@link SpecSerDe} providing scaffolding to serialize a specific {@link Spec} derived class as JSON using {@link Gson}.
+ * Extending classes MUST supply a per-instance serializer and a deserializer and SHOULD be concrete (non-generic), to permit
+ * naming/reference from properties-based configuration.
  */
-public class GsonFlowSpecSerDe implements SpecSerDe {
-  private GsonSerDe<FlowSpec> gsonSerDe;
+public abstract class GenericGsonSpecSerDe<T extends Spec> implements SpecSerDe {
+  private final GsonSerDe<T> gsonSerDe;
+  private final Class<T> specClass;
 
-  public GsonFlowSpecSerDe() {
-    this.gsonSerDe = new GsonSerDe<>(new TypeToken<FlowSpec>(){}.getType(), new FlowSpecSerializer(), new FlowSpecDeserializer());
+  protected GenericGsonSpecSerDe(Class<T> specClass, JsonSerializer<T> serializer, JsonDeserializer<T> deserializer) {
+    this.specClass = specClass;
+    this.gsonSerDe = new GsonSerDe<T>(specClass, serializer, deserializer);
   }
 
   @Override
   public byte[] serialize(Spec spec) throws SpecSerDeException {
-    if (!(spec instanceof FlowSpec)) {
-      throw new SpecSerDeException("Failed to serialize spec " + spec.getUri() + ", only FlowSpec is supported");
+    if (!specClass.isInstance(spec)) {
+      throw new SpecSerDeException("Failed to serialize spec " + spec.getUri() + ", only " + specClass.getName() + " is supported");
     }
 
     try {
-      return this.gsonSerDe.serialize((FlowSpec) spec).getBytes(Charsets.UTF_8);
+      return this.gsonSerDe.serialize(specClass.cast(spec)).getBytes(Charsets.UTF_8);
     } catch (JsonParseException e) {
       throw new SpecSerDeException(spec, e);
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
index dd8d20d..21e9140 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
@@ -17,46 +17,17 @@
 
 package org.apache.gobblin.runtime.spec_serde;
 
-import com.google.common.base.Charsets;
 import com.google.gson.Gson;
-import com.google.gson.JsonParseException;
-import com.google.gson.reflect.TypeToken;
-
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecSerDe;
-import org.apache.gobblin.runtime.api.SpecSerDeException;
 
 
 /**
- * {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that currently only {@link FlowSpec}s are supported.
+ * {@link SpecSerDe} for {@link FlowSpec}s that serializes as JSON using {@link Gson}.
  */
-public class GsonFlowSpecSerDe implements SpecSerDe {
-  private GsonSerDe<FlowSpec> gsonSerDe;
+public class GsonFlowSpecSerDe extends GenericGsonSpecSerDe<FlowSpec> {
 
   public GsonFlowSpecSerDe() {
-    this.gsonSerDe = new GsonSerDe<>(new TypeToken<FlowSpec>(){}.getType(), new FlowSpecSerializer(), new FlowSpecDeserializer());
-  }
-
-  @Override
-  public byte[] serialize(Spec spec) throws SpecSerDeException {
-    if (!(spec instanceof FlowSpec)) {
-      throw new SpecSerDeException("Failed to serialize spec " + spec.getUri() + ", only FlowSpec is supported");
-    }
-
-    try {
-      return this.gsonSerDe.serialize((FlowSpec) spec).getBytes(Charsets.UTF_8);
-    } catch (JsonParseException e) {
-      throw new SpecSerDeException(spec, e);
-    }
-  }
-
-  @Override
-  public Spec deserialize(byte[] spec) throws SpecSerDeException {
-    try {
-      return this.gsonSerDe.deserialize(new String(spec, Charsets.UTF_8));
-    } catch (JsonParseException e) {
-      throw new SpecSerDeException(e);
-    }
+    super(FlowSpec.class, new FlowSpecSerializer(), new FlowSpecDeserializer());
   }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonJobSpecSerDe.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonJobSpecSerDe.java
new file mode 100644
index 0000000..3cc494e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonJobSpecSerDe.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.gson.Gson;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+
+
+/**
+ * {@link SpecSerDe} for {@link JobSpec}s that serializes as JSON using {@link Gson}.
+ */
+public class GsonJobSpecSerDe extends GenericGsonSpecSerDe<JobSpec> {
+
+  public GsonJobSpecSerDe() {
+    super(JobSpec.class, new JobSpecSerializer(), new JobSpecDeserializer());
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecDeserializer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecDeserializer.java
new file mode 100644
index 0000000..e09d32d
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecDeserializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobSpecDeserializer implements JsonDeserializer<JobSpec> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JobSpecDeserializer.class);
+
+  @Override
+  public JobSpec deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) {
+    JsonObject jsonObject = json.getAsJsonObject();
+
+    String uri = jsonObject.get(JobSpecSerializer.JOB_SPEC_URI_KEY).getAsString();
+    String version = jsonObject.get(JobSpecSerializer.JOB_SPEC_VERSION_KEY).getAsString();
+    String description = jsonObject.get(JobSpecSerializer.JOB_SPEC_DESCRIPTION_KEY).getAsString();
+    Optional<URI> templateURI = Optional.ofNullable(jsonObject.get(JobSpecSerializer.JOB_SPEC_TEMPLATE_URI_KEY)).flatMap(jsonElem -> {
+      try {
+        return Optional.of(new URI(jsonElem.getAsString()));
+      } catch (URISyntaxException | RuntimeException e) {
+        LOGGER.warn(String.format("error deserializing '%s' as a URI: %s", jsonElem.toString(), e.getMessage()));
+        return Optional.empty();
+      }
+    });
+    Properties properties;
+
+    try {
+      properties = context.deserialize(jsonObject.get(JobSpecSerializer.JOB_SPEC_CONFIG_AS_PROPERTIES_KEY), Properties.class);
+    } catch (JsonParseException e) {
+      // for backward compatibility... (is this needed for `JobSpec`, or only for (inspiration) `FlowSpec`???)
+      properties = new Properties();
+      try {
+        properties.load(new StringReader(jsonObject.get(JobSpecSerializer.JOB_SPEC_CONFIG_AS_PROPERTIES_KEY).getAsString()));
+      } catch (IOException ioe) {
+        throw new JsonParseException(e);
+      }
+    }
+
+    JobSpec.Builder builder = JobSpec.builder(uri).withVersion(version).withDescription(description).withConfigAsProperties(properties);
+    if (templateURI.isPresent()) {
+      builder = builder.withTemplate(templateURI.get());
+    }
+
+    return builder.build();
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecSerializer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecSerializer.java
new file mode 100644
index 0000000..01b57c2
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import org.apache.gobblin.runtime.api.JobSpec;
+
+
+public class JobSpecSerializer implements JsonSerializer<JobSpec> {
+  public static final String JOB_SPEC_URI_KEY = "uri";
+  public static final String JOB_SPEC_VERSION_KEY = "version";
+  public static final String JOB_SPEC_DESCRIPTION_KEY = "description";
+  public static final String JOB_SPEC_TEMPLATE_URI_KEY = "templateURI";
+  public static final String JOB_SPEC_CONFIG_AS_PROPERTIES_KEY = "configAsProperties";
+
+  @Override
+  public JsonElement serialize(JobSpec src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject jobSpecJson = new JsonObject();
+
+    jobSpecJson.add(JOB_SPEC_URI_KEY, context.serialize(src.getUri()));
+    jobSpecJson.add(JOB_SPEC_VERSION_KEY, context.serialize(src.getVersion()));
+    jobSpecJson.add(JOB_SPEC_DESCRIPTION_KEY, context.serialize(src.getDescription()));
+    jobSpecJson.add(JOB_SPEC_TEMPLATE_URI_KEY, src.getTemplateURI().isPresent() ? context.serialize(src.getJobTemplate().get()): null);
+    jobSpecJson.add(JOB_SPEC_CONFIG_AS_PROPERTIES_KEY, context.serialize(src.getConfigAsProperties()));
+
+    return jobSpecJson;
+  }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java
new file mode 100644
index 0000000..72a914f
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_catalog;
+
+import com.google.common.base.Predicates;
+import com.typesafe.config.Config;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.test.MetricsAssert;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobCatalogListener;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/** Verify {@link MysqlJobCatalog} [modeled on {@link TestInMemoryJobCatalog}] */
+public class TestMysqlJobCatalog {
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "job_catalog";
+
+  private MysqlJobCatalog cat;
+
+  /** create a new DB/`JobCatalog` for each test, so they're completely independent */
+  @BeforeMethod
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.METRICS_ENABLED_KEY, "true")
+        .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+        .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    this.cat = new MysqlJobCatalog(config);
+  }
+
+  @Test
+  public void testCallbacks() throws Exception {
+    cat.startAsync();
+    cat.awaitRunning(1, TimeUnit.SECONDS);
+
+    JobCatalogListener l = Mockito.mock(JobCatalogListener.class);
+
+    JobSpec js1_1 = JobSpec.builder("test:job1").withVersion("1").build();
+    JobSpec js1_2 = JobSpec.builder("test:job1").withVersion("2").build();
+    JobSpec js1_3 = JobSpec.builder("test:job1").withVersion("3").build();
+    JobSpec js2 = JobSpec.builder("test:job2").withVersion("1").build();
+
+    cat.put(js1_1);
+    cat.addListener(l);
+    cat.put(js1_2);
+    cat.put(js2);
+    cat.put(js1_3);
+
+    JobSpec js1_latest_version = cat.getJobSpec(js1_1.getUri());
+    Assert.assertEquals(js1_3, js1_latest_version);
+
+    cat.remove(js2.getUri());
+    cat.remove(new URI("test:dummy_job")); // doesn't exist: won't be found, so expect no callback
+    cat.removeListener(l);
+    cat.remove(js1_3.getUri());
+
+    Mockito.verify(l).onAddJob(Mockito.eq(js1_1));
+    Mockito.verify(l).onUpdateJob(Mockito.eq(js1_2));
+    Mockito.verify(l).onAddJob(Mockito.eq(js2));
+    Mockito.verify(l).onUpdateJob(Mockito.eq(js1_3));
+    Mockito.verify(l).onDeleteJob(Mockito.eq(js2.getUri()), Mockito.eq(js2.getVersion()));
+
+    Mockito.verifyNoMoreInteractions(l);
+
+    cat.stopAsync();
+    cat.awaitTerminated(1, TimeUnit.SECONDS);
+  }
+
+  @Test
+  public void testMetrics() throws Exception {
+    cat.startAsync();
+    cat.awaitRunning(1, TimeUnit.SECONDS);
+
+    MetricsAssert ma = new MetricsAssert(cat.getMetricContext());
+
+    JobSpec js1_1 = JobSpec.builder("test:job1").withVersion("1").build();
+    JobSpec js1_2 = JobSpec.builder("test:job1").withVersion("2").build();
+    JobSpec js1_3 = JobSpec.builder("test:job1").withVersion("3").build();
+    JobSpec js2 = JobSpec.builder("test:job2").withVersion("1").build();
+
+    cat.put(js1_1);
+    Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 0);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
+    ma.assertEvent(Predicates.and(
+        MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+        MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+                                     JobCatalog.StandardMetrics.JOB_ADDED_OPERATION_TYPE),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js1_1.getUri().toString()),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js1_1.getVersion())
+        ),
+        100, TimeUnit.MILLISECONDS);
+
+    cat.put(js1_2);
+    Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
+    ma.assertEvent(Predicates.and(
+        MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+        MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+                                     JobCatalog.StandardMetrics.JOB_UPDATED_OPERATION_TYPE),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js1_2.getUri().toString()),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js1_2.getVersion())
+        ),
+        100, TimeUnit.MILLISECONDS);
+
+    cat.put(js2);
+    Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
+    ma.assertEvent(Predicates.and(
+        MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+        MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+                                     JobCatalog.StandardMetrics.JOB_ADDED_OPERATION_TYPE),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js2.getUri().toString()),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js2.getVersion())
+        ),
+        100, TimeUnit.MILLISECONDS);
+
+    cat.put(js1_3);
+    Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
+    ma.assertEvent(Predicates.and(
+        MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+        MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+                                     JobCatalog.StandardMetrics.JOB_UPDATED_OPERATION_TYPE),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js1_3.getUri().toString()),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js1_3.getVersion())
+        ),
+        100, TimeUnit.MILLISECONDS);
+
+    cat.remove(js2.getUri());
+    Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1);
+    ma.assertEvent(Predicates.and(
+        MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+        MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+                                     JobCatalog.StandardMetrics.JOB_DELETED_OPERATION_TYPE),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js2.getUri().toString()),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js2.getVersion())
+        ),
+        100, TimeUnit.MILLISECONDS);
+
+    cat.remove(new URI("test:dummy_job"));
+    Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1);
+
+    cat.remove(js1_3.getUri());
+    Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 0);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 2);
+    ma.assertEvent(Predicates.and(
+        MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+        MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+                                     JobCatalog.StandardMetrics.JOB_DELETED_OPERATION_TYPE),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js1_3.getUri().toString()),
+        MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js1_3.getVersion())
+        ),
+        100, TimeUnit.MILLISECONDS);
+
+    cat.stopAsync();
+    cat.awaitTerminated(1, TimeUnit.SECONDS);
+  }
+}