You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/20 22:37:20 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1150] spec catalog table schema change

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 72373ee  [GOBBLIN-1150] spec catalog table schema change
72373ee is described below

commit 72373eebff6b669bd8e001966ffab4e6ded8ab7b
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed May 20 15:37:12 2020 -0700

    [GOBBLIN-1150] spec catalog table schema change
    
    Closes #2988 from arjun4084346/jsonConfig
---
 .travis.yml                                        | 14 ++---
 .../testing/TestMetastoreDatabaseServer.java       |  2 +-
 .../gobblin/runtime/HighLevelConsumerTest.java     | 25 ++++----
 .../service/StreamingKafkaSpecExecutorTest.java    | 17 +++--
 .../gobblin/runtime/AbstractJobLauncher.java       |  2 +-
 .../job_exec/JobLauncherExecutionDriver.java       |  2 +-
 .../gobblin/runtime/spec_store/MysqlSpecStore.java | 51 ++++++++-------
 .../runtime/spec_store/MysqlSpecStoreTest.java     | 72 +++++++++++++++-------
 .../service/monitoring/JobStatusRetrieverTest.java |  7 ++-
 .../java/org/apache/gobblin/util/ConfigUtils.java  | 11 +++-
 .../util/callbacks/CallbacksDispatcher.java        |  2 +-
 gradle/scripts/dependencyDefinitions.gradle        |  2 +-
 gradle/scripts/javaVersionCheck.gradle             |  2 +-
 travis/test.sh                                     |  2 +-
 14 files changed, 125 insertions(+), 86 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 9815489..d742b91 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,6 @@
 language: java
 
-dist: trusty
+dist: xenial
 sudo: required
 
 addons:
@@ -9,9 +9,6 @@ addons:
       - libaio-dev
       - libdbus-glib-1-dev
       - xsltproc
-      - mysql-server-5.6
-      - mysql-client-core-5.6
-      - mysql-client-5.6
 
 before_cache:
   - rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
@@ -22,9 +19,10 @@ cache:
     - $HOME/.gradle/wrapper/
 
 before_install:
-  - "export DISPLAY=:99.0"
-  - "sh -e /etc/init.d/xvfb start"
-  - sleep 3 # give xvfb some time to start
+
+services:
+  - xvfb
+  - mysql
 
 before_script:
   - mysql -uroot -e "create user testUser identified by 'testPassword';"
@@ -46,7 +44,7 @@ env:
   - RUN_TEST_GROUP=coverage
 
 jdk:
-  - oraclejdk8
+  - openjdk8
 
 matrix:
   exclude:
diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
index 599d16f..fb70092 100644
--- a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
+++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
@@ -91,7 +91,7 @@ class TestMetastoreDatabaseServer implements Closeable {
                   this.dbHost,
                   this.dbPort);
 
-    config = MysqldConfig.aMysqldConfig(Version.v5_6_latest)
+    config = MysqldConfig.aMysqldConfig(Version.v8_latest)
         .withPort(this.dbPort)
         .withUser(this.dbUserName, this.dbUserPassword)
         .build();
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index 75c667b..c101d15 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Properties;
 
 import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
@@ -34,22 +33,25 @@ import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
 import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
 import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.runtime.kafka.MockedHighLevelConsumer;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.gobblin.writer.WriteCallback;
 
 
 @Test
+@Slf4j
 public class HighLevelConsumerTest extends KafkaTestBase {
 
   private static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
@@ -61,7 +63,6 @@ public class HighLevelConsumerTest extends KafkaTestBase {
 
   private Closer _closer;
   private String _kafkaBrokers;
-  private AsyncDataWriter dataWriter;
 
   public HighLevelConsumerTest()
       throws InterruptedException, RuntimeException {
@@ -79,9 +80,9 @@ public class HighLevelConsumerTest extends KafkaTestBase {
     producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
     producerProps.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, this.getZkConnectString());
     producerProps.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(NUM_PARTITIONS));
-    dataWriter = _closer.register(new Kafka09DataWriter(producerProps));
+    AsyncDataWriter<byte[]> dataWriter = _closer.register(new Kafka09DataWriter<byte[], byte[]>(producerProps));
 
-    List<byte[]> records = createByteArrayMessages(NUM_MSGS);
+    List<byte[]> records = createByteArrayMessages();
     WriteCallback mock = Mockito.mock(WriteCallback.class);
     for(byte[] record : records) {
       dataWriter.write(record, mock);
@@ -135,24 +136,20 @@ public class HighLevelConsumerTest extends KafkaTestBase {
         NUM_PARTITIONS);
     consumer.startAsync().awaitRunning();
 
-    consumer.awaitExactlyNMessages(NUM_MSGS, 5000);
-    try {
-      Thread.sleep(2000);
-    } catch (InterruptedException e) {
-    }
+    consumer.awaitExactlyNMessages(NUM_MSGS, 10000);
 
-    GobblinKafkaConsumerClient client  = consumer.getGobblinKafkaConsumerClient();
     for(int i=0; i< NUM_PARTITIONS; i++) {
       KafkaPartition partition = new KafkaPartition.Builder().withTopicName(TOPIC).withId(i).build();
-      Assert.assertTrue(consumer.getCommittedOffsets().containsKey(partition));
+      AssertWithBackoff.assertTrue(input -> consumer.getCommittedOffsets().containsKey(partition),
+          5000, "waiting for committing offsets", log, 2, 1000);
     }
     consumer.shutDown();
   }
 
-  private List<byte[]> createByteArrayMessages(int numMsgs) {
+  private List<byte[]> createByteArrayMessages() {
     List<byte[]> records = Lists.newArrayList();
 
-    for(int i=0; i<numMsgs; i++) {
+    for(int i=0; i<NUM_MSGS; i++) {
       byte[] msg = ("msg_" + i).getBytes();
       records.add(msg);
     }
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
index 26d3e3b..abbdc61 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -63,6 +63,8 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
   private String _kafkaBrokers;
   private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
   private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
+  String specUriString = "/foo/bar/spec";
+  Spec spec = initJobSpec(specUriString);
 
   @BeforeSuite
   public void beforeSuite() {
@@ -114,8 +116,6 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
     // SEI Producer
     _seip = _closer.register(new SimpleKafkaSpecProducer(config));
 
-    String addedSpecUriString = "/foo/bar/addedSpec";
-    Spec spec = initJobSpec(addedSpecUriString);
     WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
     log.info("WriteResponse: " + writeResponse);
 
@@ -131,15 +131,13 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
 
     Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
     Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
     Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
   }
 
   @Test (dependsOnMethods = "testAddSpec")
   public void testUpdateSpec() throws Exception {
     // update is only treated as an update for existing job specs
-    String updatedSpecUriString = "/foo/bar/addedSpec";
-    Spec spec = initJobSpec(updatedSpecUriString);
     WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get();
     log.info("WriteResponse: " + writeResponse);
 
@@ -148,15 +146,14 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
 
     Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
     Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.UPDATE), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
     Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
   }
 
   @Test (dependsOnMethods = "testUpdateSpec")
   public void testDeleteSpec() throws Exception {
     // delete needs to be on a job spec that exists to get notification
-    String deletedSpecUriString = "/foo/bar/addedSpec";
-    WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get();
+    WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(specUriString)).get();
     log.info("WriteResponse: " + writeResponse);
 
     List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
@@ -164,11 +161,11 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
 
     Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
     Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
-    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
     Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
   }
 
-  private JobSpec initJobSpec(String specUri) {
+  private static JobSpec initJobSpec(String specUri) {
     Properties properties = new Properties();
     return JobSpec.builder(specUri)
         .withConfig(ConfigUtils.propertiesToConfig(properties))
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 98249aa..cce4c38 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -625,7 +625,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
    * Subclasses can override this method to do whatever processing on the {@link TaskState}s,
    * e.g., aggregate task-level metrics into job-level metrics.
    *
-   * @deprecated Use {@link #postProcessJobState(JobState)
+   * @deprecated Use {@link #postProcessJobState(JobState)}
    */
   @Deprecated
   protected void postProcessTaskStates(@SuppressWarnings("unused") List<TaskState> taskStates) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
index 8e16ca6..ea79cc4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
@@ -334,7 +334,7 @@ public class JobLauncherExecutionDriver extends FutureTask<JobExecutionResult> i
    * <p>Conventions
    * <ul>
    *  <li>If no jobLauncherType is specified, one will be determined by the JobSpec
-   *  (see {@link JobLauncherFactory).
+   *  (see {@link JobLauncherFactory}).
    *  <li> Convention for sysConfig: use the sysConfig of the gobblinInstance if specified,
    *       otherwise use empty config.
    *  <li> Convention for log: use gobblinInstance logger plus "." + jobSpec if specified, otherwise
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 657c18f..53a2b7e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.runtime.spec_store;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
-import java.sql.Blob;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -30,6 +29,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
@@ -46,7 +46,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.api.SpecStore;
-
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -62,20 +62,28 @@ import org.apache.gobblin.runtime.api.SpecStore;
 public class MysqlSpecStore implements SpecStore {
   public static final String CONFIG_PREFIX = "mysqlSpecStore";
   public static final String DEFAULT_TAG_VALUE = "";
+  private static final String NEW_COLUMN = "spec_json";
 
   private static final String CREATE_TABLE_STATEMENT =
-      "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, tag VARCHAR(128) NOT NULL, spec LONGBLOB, PRIMARY KEY (spec_uri))";
+      "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, flow_group VARCHAR(128), flow_name VARCHAR(128), "
+          + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), source_identifier VARCHAR(128), "
+          + "destination_identifier VARCHAR(128), schedule VARCHAR(128), tag VARCHAR(128) NOT NULL, "
+          + "modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+          + "isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), "
+          + "spec LONGBLOB, " + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
   private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
-  private static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
+  protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec, " + NEW_COLUMN + ") "
+      + "VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
   private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
-  private static final String GET_STATEMENT = "SELECT spec FROM %s WHERE spec_uri = ?";
-  private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
-  private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri, spec FROM %s WHERE tag = ?";
+  private static final String GET_STATEMENT = "SELECT spec, " + NEW_COLUMN + " FROM %s WHERE spec_uri = ?";
+  private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
+  private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
+  private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri FROM %s WHERE tag = ?";
 
-  private final DataSource dataSource;
-  private final String tableName;
+  protected final DataSource dataSource;
+  protected final String tableName;
   private final URI specStoreURI;
-  private final SpecSerDe specSerDe;
+  protected final SpecSerDe specSerDe;
 
   public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
     if (config.hasPath(CONFIG_PREFIX)) {
@@ -117,15 +125,14 @@ public class MysqlSpecStore implements SpecStore {
   /**
    * Temporarily only used for testing since tag it not exposed in endpoint of {@link org.apache.gobblin.runtime.api.FlowSpec}
    */
-  public void addSpec(Spec spec, String tagValue) throws IOException{
+  public void addSpec(Spec spec, String tagValue) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
-
       statement.setString(1, spec.getUri().toString());
       statement.setString(2, tagValue);
       statement.setBlob(3, new ByteArrayInputStream(this.specSerDe.serialize(spec)));
+      statement.setString(4, new String(this.specSerDe.serialize(spec), Charsets.UTF_8));
       statement.executeUpdate();
-
       connection.commit();
     } catch (SQLException | SpecSerDeException e) {
       throw new IOException(e);
@@ -156,6 +163,7 @@ public class MysqlSpecStore implements SpecStore {
   }
 
   @Override
+  // TODO : this method is not doing what the contract is in the SpecStore interface
   public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
     addSpec(spec);
     return spec;
@@ -165,16 +173,15 @@ public class MysqlSpecStore implements SpecStore {
   public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(GET_STATEMENT, this.tableName))) {
-
       statement.setString(1, specUri.toString());
 
       try (ResultSet rs = statement.executeQuery()) {
         if (!rs.next()) {
           throw new SpecNotFoundException(specUri);
         }
-
-        Blob blob = rs.getBlob(1);
-        return this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream()));
+        return rs.getString(2) == null
+            ? this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(1).getBinaryStream()))
+            : this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8));
       }
     } catch (SQLException | SpecSerDeException e) {
       throw new IOException(e);
@@ -195,14 +202,16 @@ public class MysqlSpecStore implements SpecStore {
   public Collection<Spec> getSpecs() throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
-
       List<Spec> specs = new ArrayList<>();
 
       try (ResultSet rs = statement.executeQuery()) {
         while (rs.next()) {
           try {
-            Blob blob = rs.getBlob(2);
-            specs.add(this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream())));
+            specs.add(
+                rs.getString(3) == null
+                    ? this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8))
+                    : this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(3).getBinaryStream()))
+            );
           } catch (SQLException | SpecSerDeException e) {
             log.error("Failed to deserialize spec", e);
           }
@@ -218,7 +227,7 @@ public class MysqlSpecStore implements SpecStore {
   @Override
   public Iterator<URI> getSpecURIs() throws IOException {
     try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
+        PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_URIS_STATEMENT, this.tableName))) {
       return getURIIteratorByQuery(statement);
     } catch (SQLException e) {
       throw new IOException(e);
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 10357b4..16e08b8 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -17,14 +17,18 @@
 
 package org.apache.gobblin.runtime.spec_store;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.net.URI;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
 import java.util.List;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.SerializationUtils;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -38,8 +42,9 @@ import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
-import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
+import org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe;
 
 
 public class MysqlSpecStoreTest {
@@ -48,6 +53,7 @@ public class MysqlSpecStoreTest {
   private static final String TABLE = "spec_store";
 
   private MysqlSpecStore specStore;
+  private MysqlSpecStore oldSpecStore;
   private URI uri1 = URI.create("flowspec1");
   private URI uri2 = URI.create("flowspec2");
   private URI uri3 = URI.create("flowspec3");
@@ -79,6 +85,7 @@ public class MysqlSpecStoreTest {
         .build();
 
     this.specStore = new MysqlSpecStore(config, new TestSpecSerDe());
+    this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
   }
 
   @Test
@@ -91,11 +98,8 @@ public class MysqlSpecStoreTest {
     Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testAddSpec")
   public void testGetSpec() throws Exception {
-    this.specStore.addSpec(this.flowSpec1);
-    this.specStore.addSpec(this.flowSpec2);
-
     FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
     Assert.assertEquals(result, this.flowSpec1);
 
@@ -126,43 +130,65 @@ public class MysqlSpecStoreTest {
         .withVersion("Test version 5")
         .build();
 
-    this.specStore.addSpec(flowSpec3);
     this.specStore.addSpec(flowSpec4, "dr");
     this.specStore.addSpec(flowSpec5, "dr");
 
-    Assert.assertTrue(this.specStore.exists(uri3));
     Assert.assertTrue(this.specStore.exists(uri4));
     Assert.assertTrue(this.specStore.exists(uri5));
-    List<URI> result = new ArrayList();
+    List<URI> result = new ArrayList<>();
     this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
     Assert.assertEquals(result.size(), 2);
   }
 
-  @Test
+  @Test (expectedExceptions = {IOException.class})
   public void testGetCorruptedSpec() throws Exception {
-    this.specStore.addSpec(this.flowSpec1);
-    this.specStore.addSpec(this.flowSpec2);
     this.specStore.addSpec(this.flowSpec3);
-
-    Collection<Spec> specs = this.specStore.getSpecs();
-    Assert.assertTrue(specs.contains(this.flowSpec1));
-    Assert.assertTrue(specs.contains(this.flowSpec2));
-    Assert.assertFalse(specs.contains(this.flowSpec3));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testGetSpec")
   public void testDeleteSpec() throws Exception {
-    this.specStore.addSpec(this.flowSpec1);
-    Assert.assertTrue(this.specStore.exists(this.uri1));
-
     this.specStore.deleteSpec(this.uri1);
     Assert.assertFalse(this.specStore.exists(this.uri1));
   }
 
-  public class TestSpecSerDe extends JavaSpecSerDe {
+  @Test (dependsOnMethods = "testDeleteSpec")
+  public void testReadOldColumn() throws Exception {
+    this.oldSpecStore.addSpec(this.flowSpec1);
+
+    FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri1);
+    Assert.assertEquals(spec, this.flowSpec1);
+  }
+
+  /**
+   * A {@link MysqlSpecStore} which does not write into the new spec_json column
+   * to simulate behavior of a table with old data.
+   */
+  public static class OldSpecStore extends MysqlSpecStore {
+
+    public OldSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
+      super(config, specSerDe);
+    }
+
+    @Override
+    public void addSpec(Spec spec, String tagValue) throws IOException {
+      try (Connection connection = this.dataSource.getConnection();
+          PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
+        statement.setString(1, spec.getUri().toString());
+        statement.setString(2, tagValue);
+        statement.setBlob(3, new ByteArrayInputStream(this.specSerDe.serialize(spec)));
+        statement.setString(4, null);
+        statement.executeUpdate();
+        connection.commit();
+      } catch (SQLException | SpecSerDeException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  public class TestSpecSerDe extends GsonFlowSpecSerDe {
     @Override
     public byte[] serialize(Spec spec) throws SpecSerDeException {
-      byte[] bytes = SerializationUtils.serialize(spec);
+      byte[] bytes = super.serialize(spec);
       // Reverse bytes to simulate corrupted Spec
       if (spec.getUri().equals(uri3)) {
         ArrayUtils.reverse(bytes);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 1ab3b24..a6546e2 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -106,7 +106,7 @@ public abstract class JobStatusRetrieverTest {
     jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
     Assert.assertTrue(jobStatusIterator.hasNext());
     jobStatus = jobStatusIterator.next();
-    if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+    if (JobStatusRetriever.isFlowStatus(jobStatus)) {
       jobStatus = jobStatusIterator.next();
     }
     Assert.assertTrue(jobStatus.getJobName().equals(MY_JOB_NAME_1) || jobStatus.getJobName().equals(MY_JOB_NAME_2));
@@ -115,6 +115,11 @@ public abstract class JobStatusRetrieverTest {
     String nextExpectedJobName = (MY_JOB_NAME_1.equals(jobName)) ? MY_JOB_NAME_2 : MY_JOB_NAME_1;
     Assert.assertTrue(jobStatusIterator.hasNext());
     jobStatus = jobStatusIterator.next();
+    if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+      Assert.assertTrue(jobStatusIterator.hasNext());
+      jobStatus = jobStatusIterator.next();
+    }
+
     Assert.assertEquals(jobStatus.getJobName(), nextExpectedJobName);
   }
 
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
index 026bba2..33b9e09 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
@@ -244,7 +244,7 @@ public class ConfigUtils {
           !blacklistedKeys.contains(entryKey)) {
         if (fullPrefixKeys.contains(entryKey)) {
           entryKey = sanitizeFullPrefixKey(entryKey);
-        } else if (entryKey.endsWith(STRIP_SUFFIX)) {
+        } else if (sanitizedKey(entryKey)) {
           throw new RuntimeException("Properties are not allowed to end in " + STRIP_SUFFIX);
         }
         immutableMapBuilder.put(entryKey, entry.getValue());
@@ -257,8 +257,15 @@ public class ConfigUtils {
     return propKey + STRIP_SUFFIX;
   }
 
+  /**
+   * returns true if is it a sanitized key
+   */
+  public static boolean sanitizedKey(String propKey) {
+    return propKey.endsWith(STRIP_SUFFIX);
+  }
+
   public static String desanitizeKey(String propKey) {
-    propKey =  propKey.endsWith(STRIP_SUFFIX) ?
+    propKey =  sanitizedKey(propKey) ?
         propKey.substring(0, propKey.length() - STRIP_SUFFIX.length()) : propKey;
 
     // Also strip quotes that can get introduced by TypeSafe.Config
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
index a4f3562..144556c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
@@ -44,7 +44,7 @@ import lombok.Data;
 /**
  * A helper to dispatch callbacks to a set of listeners. The CallbacksDispatcher is responsible for
  * managing the list if listeners which implement a common interface L. Invocation happens through
- * the {@link #execCallbacks(CallbackFactory) method.
+ * the {@link #execCallbacks(CallbackFactory)} method.
  *
  * @param L     the listener type; it is strongly advised that the class implements toString() to
  *              provide useful logging
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index cf6f515..67435a6 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -157,7 +157,7 @@ ext.externalDependency = [
     "jsonAssert": "org.skyscreamer:jsonassert:1.3.0",
     "reflections" : "org.reflections:reflections:0.9.10",
     "embeddedProcess": "de.flapdoodle.embed:de.flapdoodle.embed.process:1.50.2",
-    "testMysqlServer": "com.wix:wix-embedded-mysql:4.2.0",
+    "testMysqlServer": "com.wix:wix-embedded-mysql:4.6.1",
     "flyway": "org.flywaydb:flyway-core:3.2.1",
     "oltu": "org.apache.oltu.oauth2:org.apache.oltu.oauth2.client:1.0.2",
     "googleAnalytics": "com.google.apis:google-api-services-analytics:v3-rev134-1.22.0",
diff --git a/gradle/scripts/javaVersionCheck.gradle b/gradle/scripts/javaVersionCheck.gradle
index 5159be2..8a9e59f 100644
--- a/gradle/scripts/javaVersionCheck.gradle
+++ b/gradle/scripts/javaVersionCheck.gradle
@@ -2,7 +2,7 @@ gradle.taskGraph.whenReady {
    gradle.taskGraph.allTasks.each { task ->
         def taskProject = task.project
         if (taskProject.hasProperty('requiresJavaVersion') && !taskProject.requiresJavaVersion.equals(javaVersion)) {
-            logger.warn("WARNING: Project {} requres Java version {} which conflicts with build version {}. COMPILATION DISABLED. Please use -PjdkVersion={} .",
+            logger.warn("WARNING: Project {} requires Java version {} which conflicts with build version {}. COMPILATION DISABLED. Please use -PjdkVersion={} .",
                   taskProject.name,
                   taskProject.requiresJavaVersion,
                   javaVersion,
diff --git a/travis/test.sh b/travis/test.sh
index 576d9bd..261da9c 100755
--- a/travis/test.sh
+++ b/travis/test.sh
@@ -30,7 +30,7 @@ RUN_TEST_GROUP=${RUN_TEST_GROUP:-default}
 script_dir=$(dirname $0)
 echo "Old GRADLE_OPTS=$GRADLE_OPTS"
 
-export java_version=$(java -version 2>&1 | grep 'java version' | sed -e 's/java version "\(1\..\).*/\1/')
+export java_version=$(java -version 2>&1 | grep 'openjdk version' | sed -e 's/openjdk version "\(1\..\).*/\1/')
 
 echo "Using Java version:${java_version}"