You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/03 23:15:07 UTC
[gobblin] branch master updated: [GOBBLIN-1569] Add RDBMS-backed
`MysqlJobCatalog`, as alternative to file system storage (#3421)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b1aed5c [GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (#3421)
b1aed5c is described below
commit b1aed5c865734b4aaab5acec6ed8c78bd7ada886
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Wed Nov 3 16:15:00 2021 -0700
[GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (#3421)
* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage
* Streamline `JobSpecDeserializer` error handling, on review feedback.
* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.
* Fix javadoc slipup
---
.../runtime/job_catalog/ImmutableFSJobCatalog.java | 1 -
.../runtime/job_catalog/MysqlJobCatalog.java | 215 +++++++++++++++++++++
...lowSpecSerDe.java => GenericGsonSpecSerDe.java} | 25 +--
.../runtime/spec_serde/GsonFlowSpecSerDe.java | 35 +---
.../runtime/spec_serde/GsonJobSpecSerDe.java | 33 ++++
.../runtime/spec_serde/JobSpecDeserializer.java | 77 ++++++++
.../runtime/spec_serde/JobSpecSerializer.java | 47 +++++
.../runtime/job_catalog/TestMysqlJobCatalog.java | 212 ++++++++++++++++++++
8 files changed, 601 insertions(+), 44 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
index 86e2b96..7ad4f11 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
@@ -208,7 +208,6 @@ public class ImmutableFSJobCatalog extends JobCatalogBase implements JobCatalog
/**
* Fetch single job file based on its URI,
- * return null requested URI not existed
* @param uri The relative Path to the target job configuration.
* @return
*/
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java
new file mode 100644
index 0000000..14a6829
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/MysqlJobCatalog.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.job_catalog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_serde.GsonJobSpecSerDe;
+import org.apache.gobblin.runtime.spec_store.MysqlBaseSpecStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MySQL-backed Job Catalog for persisting {@link JobSpec} job configuration information, which fully supports (mutation)
+ * listeners and metrics.
+ */
+public class MysqlJobCatalog extends JobCatalogBase implements MutableJobCatalog {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MysqlJobCatalog.class);
+ public static final String DB_CONFIG_PREFIX = "mysqlJobCatalog";
+
+ protected final MutableJobCatalog.MutableStandardMetrics mutableMetrics;
+ protected final MysqlBaseSpecStore jobSpecStore;
+
+ /**
+ * Initialize, with DB config contextualized by `DB_CONFIG_PREFIX`.
+ */
+ public MysqlJobCatalog(Config sysConfig)
+ throws IOException {
+ this(sysConfig, Optional.<MetricContext>absent(), GobblinMetrics.isEnabled(sysConfig));
+ }
+
+ public MysqlJobCatalog(GobblinInstanceEnvironment env) throws IOException {
+ super(env);
+ this.mutableMetrics = (MutableJobCatalog.MutableStandardMetrics) metrics;
+ this.jobSpecStore = createJobSpecStore(env.getSysConfig().getConfig());
+ }
+
+ public MysqlJobCatalog(Config sysConfig, Optional<MetricContext> parentMetricContext,
+ boolean instrumentationEnabled) throws IOException {
+ super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled, Optional.of(sysConfig));
+ this.mutableMetrics = (MutableJobCatalog.MutableStandardMetrics) metrics;
+ this.jobSpecStore = createJobSpecStore(sysConfig);
+ }
+
+ @Override
+ protected JobCatalog.StandardMetrics createStandardMetrics(Optional<Config> sysConfig) {
+ log.info("create standard metrics {} for {}", MutableJobCatalog.MutableStandardMetrics.class.getName(), this.getClass().getName());
+ return new MutableJobCatalog.MutableStandardMetrics(this, sysConfig);
+ }
+
+ protected MysqlBaseSpecStore createJobSpecStore(Config sysConfig) {
+ try {
+ return new MysqlBaseSpecStore(sysConfig, new GsonJobSpecSerDe()) {
+ @Override
+ protected String getConfigPrefix() {
+ return MysqlJobCatalog.DB_CONFIG_PREFIX;
+ }
+ };
+ } catch (IOException e) {
+ throw new RuntimeException("unable to create `JobSpec` store", e);
+ }
+ }
+
+ /** @return all {@link JobSpec}s */
+ @Override
+ public List<JobSpec> getJobs() {
+ try {
+ return (List) jobSpecStore.getSpecs();
+ } catch (IOException e) {
+ throw new RuntimeException("error getting (all) job specs", e);
+ }
+ }
+
+ /**
+ * Obtain an iterator to fetch all job specifications. Unlike {@link #getJobs()}, this method avoids loading
+ * all job configs into memory in the very beginning.
+ * Interleaving notes: jobs added/modified/deleted between `Iterator` creation and exhaustion MAY or MAY NOT be reflected.
+ *
+ * @return an iterator for (all) present {@link JobSpec}s
+ */
+ @Override
+ public Iterator<JobSpec> getJobSpecIterator() {
+ try {
+ return Iterators.<Optional<JobSpec>, JobSpec>transform(
+ Iterators.filter(
+ Iterators.transform(jobSpecStore.getSpecURIs(), uri -> {
+ try {
+ return Optional.of(MysqlJobCatalog.this.getJobSpec(uri));
+ } catch (JobSpecNotFoundException e) {
+ MysqlJobCatalog.this.log.info("unable to retrieve previously identified JobSpec by URI '{}'", uri);
+ return Optional.absent();
+ }}),
+ Optional::isPresent),
+ Optional::get);
+ } catch (IOException e) {
+ throw new RuntimeException("error iterating (all) job specs", e);
+ }
+ }
+
+ /**
+ * Fetch single {@link JobSpec} by URI.
+ * @return the `JobSpec`
+ * @throws {@link JobSpecNotFoundException}
+ */
+ @Override
+ public JobSpec getJobSpec(URI uri)
+ throws JobSpecNotFoundException {
+ Preconditions.checkNotNull(uri);
+ try {
+ return (JobSpec) jobSpecStore.getSpec(uri);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("error accessing job spec '%s'", uri), e);
+ } catch (SpecNotFoundException e) {
+ throw new JobSpecNotFoundException(uri);
+ }
+ }
+
+ /**
+ * Add or update (when an existing) {@link JobSpec}, triggering the appropriate
+ * {@link org.apache.gobblin.runtime.api.JobCatalogListener} callback.
+ *
+ * NOTE: `synchronized` (w/ `remove()`) for integrity of (existence) check-then-update.
+ */
+ @Override
+ public synchronized void put(JobSpec jobSpec) {
+ Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+ Preconditions.checkNotNull(jobSpec);
+ try {
+ long startTime = System.currentTimeMillis();
+ boolean isUpdate = jobSpecStore.exists(jobSpec.getUri());
+ if (isUpdate) {
+ try {
+ jobSpecStore.updateSpec(jobSpec);
+ this.mutableMetrics.updatePutJobTime(startTime);
+ this.listeners.onUpdateJob(jobSpec);
+ } catch (SpecNotFoundException e) { // should never happen (since `synchronized`)
+ throw new RuntimeException(String.format("error finding spec to update '%s'", jobSpec.getUri()), e);
+ }
+ } else {
+ jobSpecStore.addSpec(jobSpec);
+ this.mutableMetrics.updatePutJobTime(startTime);
+ this.listeners.onAddJob(jobSpec);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("error updating or adding JobSpec '%s'", jobSpec.getUri()), e);
+ }
+ }
+
+ /**
+ * Delete (an existing) {@link JobSpec}, triggering the appropriate {@link org.apache.gobblin.runtime.api.JobCatalogListener} callback.
+ *
+ * NOTE: `synchronized` w/ `put()` to protect its check-then-update.
+ */
+ @Override
+ public synchronized void remove(URI jobURI) {
+ remove(jobURI, false);
+ }
+
+ /**
+ * NOTE: `synchronized` w/ `put()` to protect its check-then-update.
+ *
+ * @param alwaysTriggerListeners whether invariably to trigger {@link org.apache.gobblin.runtime.api.JobCatalogListener#onCancelJob(URI)}
+ */
+ @Override
+ public synchronized void remove(URI jobURI, boolean alwaysTriggerListeners) {
+ Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+ Preconditions.checkNotNull(jobURI);
+ try {
+ long startTime = System.currentTimeMillis();
+ JobSpec jobSpec = (JobSpec) jobSpecStore.getSpec(jobURI);
+ jobSpecStore.deleteSpec(jobURI);
+ this.mutableMetrics.updateRemoveJobTime(startTime);
+ this.listeners.onDeleteJob(jobURI, jobSpec.getVersion());
+ } catch (SpecNotFoundException e) {
+ LOGGER.warn("Unknown job spec URI: '" + jobURI + "'. Deletion failed.");
+ } catch (IOException e) {
+ throw new RuntimeException("When removing a JobConf. file, issues unexpected happen:" + e.getMessage());
+ } finally {
+ // HelixRetriggeringJobCallable deletes the job file after submitting it to helix,
+ // so trigger listeners regardless of its existence.
+ if (alwaysTriggerListeners) {
+ this.listeners.onCancelJob(jobURI);
+ }
+ }
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GenericGsonSpecSerDe.java
similarity index 62%
copy from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
copy to gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GenericGsonSpecSerDe.java
index dd8d20d..7fe083a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GenericGsonSpecSerDe.java
@@ -19,33 +19,36 @@ package org.apache.gobblin.runtime.spec_serde;
import com.google.common.base.Charsets;
import com.google.gson.Gson;
+import com.google.gson.JsonDeserializer;
import com.google.gson.JsonParseException;
-import com.google.gson.reflect.TypeToken;
-
-import org.apache.gobblin.runtime.api.FlowSpec;
+import com.google.gson.JsonSerializer;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecSerDeException;
/**
- * {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that currently only {@link FlowSpec}s are supported.
+ * Abstract {@link SpecSerDe} providing scaffolding to serialize a specific {@link Spec} derived class as JSON using {@link Gson}.
+ * Extending classes MUST supply a per-instance serializer and a deserializer and SHOULD be concrete (non-generic), to permit
+ * naming/reference from properties-based configuration.
*/
-public class GsonFlowSpecSerDe implements SpecSerDe {
- private GsonSerDe<FlowSpec> gsonSerDe;
+public abstract class GenericGsonSpecSerDe<T extends Spec> implements SpecSerDe {
+ private final GsonSerDe<T> gsonSerDe;
+ private final Class<T> specClass;
- public GsonFlowSpecSerDe() {
- this.gsonSerDe = new GsonSerDe<>(new TypeToken<FlowSpec>(){}.getType(), new FlowSpecSerializer(), new FlowSpecDeserializer());
+ protected GenericGsonSpecSerDe(Class<T> specClass, JsonSerializer<T> serializer, JsonDeserializer<T> deserializer) {
+ this.specClass = specClass;
+ this.gsonSerDe = new GsonSerDe<T>(specClass, serializer, deserializer);
}
@Override
public byte[] serialize(Spec spec) throws SpecSerDeException {
- if (!(spec instanceof FlowSpec)) {
- throw new SpecSerDeException("Failed to serialize spec " + spec.getUri() + ", only FlowSpec is supported");
+ if (!specClass.isInstance(spec)) {
+ throw new SpecSerDeException("Failed to serialize spec " + spec.getUri() + ", only " + specClass.getName() + " is supported");
}
try {
- return this.gsonSerDe.serialize((FlowSpec) spec).getBytes(Charsets.UTF_8);
+ return this.gsonSerDe.serialize(specClass.cast(spec)).getBytes(Charsets.UTF_8);
} catch (JsonParseException e) {
throw new SpecSerDeException(spec, e);
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
index dd8d20d..21e9140 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
@@ -17,46 +17,17 @@
package org.apache.gobblin.runtime.spec_serde;
-import com.google.common.base.Charsets;
import com.google.gson.Gson;
-import com.google.gson.JsonParseException;
-import com.google.gson.reflect.TypeToken;
-
import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecSerDe;
-import org.apache.gobblin.runtime.api.SpecSerDeException;
/**
- * {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that currently only {@link FlowSpec}s are supported.
+ * {@link SpecSerDe} for {@link FlowSpec}s that serializes as JSON using {@link Gson}.
*/
-public class GsonFlowSpecSerDe implements SpecSerDe {
- private GsonSerDe<FlowSpec> gsonSerDe;
+public class GsonFlowSpecSerDe extends GenericGsonSpecSerDe<FlowSpec> {
public GsonFlowSpecSerDe() {
- this.gsonSerDe = new GsonSerDe<>(new TypeToken<FlowSpec>(){}.getType(), new FlowSpecSerializer(), new FlowSpecDeserializer());
- }
-
- @Override
- public byte[] serialize(Spec spec) throws SpecSerDeException {
- if (!(spec instanceof FlowSpec)) {
- throw new SpecSerDeException("Failed to serialize spec " + spec.getUri() + ", only FlowSpec is supported");
- }
-
- try {
- return this.gsonSerDe.serialize((FlowSpec) spec).getBytes(Charsets.UTF_8);
- } catch (JsonParseException e) {
- throw new SpecSerDeException(spec, e);
- }
- }
-
- @Override
- public Spec deserialize(byte[] spec) throws SpecSerDeException {
- try {
- return this.gsonSerDe.deserialize(new String(spec, Charsets.UTF_8));
- } catch (JsonParseException e) {
- throw new SpecSerDeException(e);
- }
+ super(FlowSpec.class, new FlowSpecSerializer(), new FlowSpecDeserializer());
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonJobSpecSerDe.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonJobSpecSerDe.java
new file mode 100644
index 0000000..3cc494e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonJobSpecSerDe.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.gson.Gson;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+
+
+/**
+ * {@link SpecSerDe} for {@link JobSpec}s that serializes as JSON using {@link Gson}.
+ */
+public class GsonJobSpecSerDe extends GenericGsonSpecSerDe<JobSpec> {
+
+ public GsonJobSpecSerDe() {
+ super(JobSpec.class, new JobSpecSerializer(), new JobSpecDeserializer());
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecDeserializer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecDeserializer.java
new file mode 100644
index 0000000..e09d32d
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecDeserializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobSpecDeserializer implements JsonDeserializer<JobSpec> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobSpecDeserializer.class);
+
+ @Override
+ public JobSpec deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) {
+ JsonObject jsonObject = json.getAsJsonObject();
+
+ String uri = jsonObject.get(JobSpecSerializer.JOB_SPEC_URI_KEY).getAsString();
+ String version = jsonObject.get(JobSpecSerializer.JOB_SPEC_VERSION_KEY).getAsString();
+ String description = jsonObject.get(JobSpecSerializer.JOB_SPEC_DESCRIPTION_KEY).getAsString();
+ Optional<URI> templateURI = Optional.ofNullable(jsonObject.get(JobSpecSerializer.JOB_SPEC_TEMPLATE_URI_KEY)).flatMap(jsonElem -> {
+ try {
+ return Optional.of(new URI(jsonElem.getAsString()));
+ } catch (URISyntaxException | RuntimeException e) {
+ LOGGER.warn(String.format("error deserializing '%s' as a URI: %s", jsonElem.toString(), e.getMessage()));
+ return Optional.empty();
+ }
+ });
+ Properties properties;
+
+ try {
+ properties = context.deserialize(jsonObject.get(JobSpecSerializer.JOB_SPEC_CONFIG_AS_PROPERTIES_KEY), Properties.class);
+ } catch (JsonParseException e) {
+ // for backward compatibility... (is this needed for `JobSpec`, or only for (inspiration) `FlowSpec`???)
+ properties = new Properties();
+ try {
+ properties.load(new StringReader(jsonObject.get(JobSpecSerializer.JOB_SPEC_CONFIG_AS_PROPERTIES_KEY).getAsString()));
+ } catch (IOException ioe) {
+ throw new JsonParseException(e);
+ }
+ }
+
+ JobSpec.Builder builder = JobSpec.builder(uri).withVersion(version).withDescription(description).withConfigAsProperties(properties);
+ if (templateURI.isPresent()) {
+ builder = builder.withTemplate(templateURI.get());
+ }
+
+ return builder.build();
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecSerializer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecSerializer.java
new file mode 100644
index 0000000..01b57c2
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JobSpecSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import java.lang.reflect.Type;
+import org.apache.gobblin.runtime.api.JobSpec;
+
+
+public class JobSpecSerializer implements JsonSerializer<JobSpec> {
+ public static final String JOB_SPEC_URI_KEY = "uri";
+ public static final String JOB_SPEC_VERSION_KEY = "version";
+ public static final String JOB_SPEC_DESCRIPTION_KEY = "description";
+ public static final String JOB_SPEC_TEMPLATE_URI_KEY = "templateURI";
+ public static final String JOB_SPEC_CONFIG_AS_PROPERTIES_KEY = "configAsProperties";
+
+ @Override
+ public JsonElement serialize(JobSpec src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jobSpecJson = new JsonObject();
+
+ jobSpecJson.add(JOB_SPEC_URI_KEY, context.serialize(src.getUri()));
+ jobSpecJson.add(JOB_SPEC_VERSION_KEY, context.serialize(src.getVersion()));
+ jobSpecJson.add(JOB_SPEC_DESCRIPTION_KEY, context.serialize(src.getDescription()));
+ jobSpecJson.add(JOB_SPEC_TEMPLATE_URI_KEY, src.getTemplateURI().isPresent() ? context.serialize(src.getJobTemplate().get()): null);
+ jobSpecJson.add(JOB_SPEC_CONFIG_AS_PROPERTIES_KEY, context.serialize(src.getConfigAsProperties()));
+
+ return jobSpecJson;
+ }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java
new file mode 100644
index 0000000..72a914f
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_catalog;
+
+import com.google.common.base.Predicates;
+import com.typesafe.config.Config;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.test.MetricsAssert;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobCatalogListener;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/** Verify {@link MysqlJobCatalog} [modeled on {@link TestInMemoryJobCatalog}] */
+public class TestMysqlJobCatalog {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "job_catalog";
+
+ private MysqlJobCatalog cat;
+
+ /** create a new DB/`JobCatalog` for each test, so they're completely independent */
+ @BeforeMethod
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.METRICS_ENABLED_KEY, "true")
+ .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.cat = new MysqlJobCatalog(config);
+ }
+
+ @Test
+ public void testCallbacks() throws Exception {
+ cat.startAsync();
+ cat.awaitRunning(1, TimeUnit.SECONDS);
+
+ JobCatalogListener l = Mockito.mock(JobCatalogListener.class);
+
+ JobSpec js1_1 = JobSpec.builder("test:job1").withVersion("1").build();
+ JobSpec js1_2 = JobSpec.builder("test:job1").withVersion("2").build();
+ JobSpec js1_3 = JobSpec.builder("test:job1").withVersion("3").build();
+ JobSpec js2 = JobSpec.builder("test:job2").withVersion("1").build();
+
+ cat.put(js1_1);
+ cat.addListener(l);
+ cat.put(js1_2);
+ cat.put(js2);
+ cat.put(js1_3);
+
+ JobSpec js1_latest_version = cat.getJobSpec(js1_1.getUri());
+ Assert.assertEquals(js1_3, js1_latest_version);
+
+ cat.remove(js2.getUri());
+ cat.remove(new URI("test:dummy_job")); // doesn't exist: won't be found, so expect no callback
+ cat.removeListener(l);
+ cat.remove(js1_3.getUri());
+
+ Mockito.verify(l).onAddJob(Mockito.eq(js1_1));
+ Mockito.verify(l).onUpdateJob(Mockito.eq(js1_2));
+ Mockito.verify(l).onAddJob(Mockito.eq(js2));
+ Mockito.verify(l).onUpdateJob(Mockito.eq(js1_3));
+ Mockito.verify(l).onDeleteJob(Mockito.eq(js2.getUri()), Mockito.eq(js2.getVersion()));
+
+ Mockito.verifyNoMoreInteractions(l);
+
+ cat.stopAsync();
+ cat.awaitTerminated(1, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testMetrics() throws Exception {
+ cat.startAsync();
+ cat.awaitRunning(1, TimeUnit.SECONDS);
+
+ MetricsAssert ma = new MetricsAssert(cat.getMetricContext());
+
+ JobSpec js1_1 = JobSpec.builder("test:job1").withVersion("1").build();
+ JobSpec js1_2 = JobSpec.builder("test:job1").withVersion("2").build();
+ JobSpec js1_3 = JobSpec.builder("test:job1").withVersion("3").build();
+ JobSpec js2 = JobSpec.builder("test:job2").withVersion("1").build();
+
+ cat.put(js1_1);
+ Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 0);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
+ ma.assertEvent(Predicates.and(
+ MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+ MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+ JobCatalog.StandardMetrics.JOB_ADDED_OPERATION_TYPE),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js1_1.getUri().toString()),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js1_1.getVersion())
+ ),
+ 100, TimeUnit.MILLISECONDS);
+
+ cat.put(js1_2);
+ Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
+ ma.assertEvent(Predicates.and(
+ MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+ MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+ JobCatalog.StandardMetrics.JOB_UPDATED_OPERATION_TYPE),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js1_2.getUri().toString()),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js1_2.getVersion())
+ ),
+ 100, TimeUnit.MILLISECONDS);
+
+ cat.put(js2);
+ Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
+ ma.assertEvent(Predicates.and(
+ MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+ MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+ JobCatalog.StandardMetrics.JOB_ADDED_OPERATION_TYPE),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js2.getUri().toString()),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js2.getVersion())
+ ),
+ 100, TimeUnit.MILLISECONDS);
+
+ cat.put(js1_3);
+ Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
+ ma.assertEvent(Predicates.and(
+ MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+ MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+ JobCatalog.StandardMetrics.JOB_UPDATED_OPERATION_TYPE),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js1_3.getUri().toString()),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js1_3.getVersion())
+ ),
+ 100, TimeUnit.MILLISECONDS);
+
+ cat.remove(js2.getUri());
+ Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1);
+ ma.assertEvent(Predicates.and(
+ MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+ MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+ JobCatalog.StandardMetrics.JOB_DELETED_OPERATION_TYPE),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js2.getUri().toString()),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js2.getVersion())
+ ),
+ 100, TimeUnit.MILLISECONDS);
+
+ cat.remove(new URI("test:dummy_job"));
+ Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1);
+
+ cat.remove(js1_3.getUri());
+ Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 0);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 2);
+ ma.assertEvent(Predicates.and(
+ MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
+ MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.OPERATION_TYPE_META,
+ JobCatalog.StandardMetrics.JOB_DELETED_OPERATION_TYPE),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_URI_META, js1_3.getUri().toString()),
+ MetricsAssert.eqEventMetdata(GobblinMetricsKeys.JOB_SPEC_VERSION_META, js1_3.getVersion())
+ ),
+ 100, TimeUnit.MILLISECONDS);
+
+ cat.stopAsync();
+ cat.awaitTerminated(1, TimeUnit.SECONDS);
+ }
+}