You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/20 22:37:20 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1150] spec
catalog table schema change
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 72373ee [GOBBLIN-1150] spec catalog table schema change
72373ee is described below
commit 72373eebff6b669bd8e001966ffab4e6ded8ab7b
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed May 20 15:37:12 2020 -0700
[GOBBLIN-1150] spec catalog table schema change
Closes #2988 from arjun4084346/jsonConfig
---
.travis.yml | 14 ++---
.../testing/TestMetastoreDatabaseServer.java | 2 +-
.../gobblin/runtime/HighLevelConsumerTest.java | 25 ++++----
.../service/StreamingKafkaSpecExecutorTest.java | 17 +++--
.../gobblin/runtime/AbstractJobLauncher.java | 2 +-
.../job_exec/JobLauncherExecutionDriver.java | 2 +-
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 51 ++++++++-------
.../runtime/spec_store/MysqlSpecStoreTest.java | 72 +++++++++++++++-------
.../service/monitoring/JobStatusRetrieverTest.java | 7 ++-
.../java/org/apache/gobblin/util/ConfigUtils.java | 11 +++-
.../util/callbacks/CallbacksDispatcher.java | 2 +-
gradle/scripts/dependencyDefinitions.gradle | 2 +-
gradle/scripts/javaVersionCheck.gradle | 2 +-
travis/test.sh | 2 +-
14 files changed, 125 insertions(+), 86 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 9815489..d742b91 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,6 @@
language: java
-dist: trusty
+dist: xenial
sudo: required
addons:
@@ -9,9 +9,6 @@ addons:
- libaio-dev
- libdbus-glib-1-dev
- xsltproc
- - mysql-server-5.6
- - mysql-client-core-5.6
- - mysql-client-5.6
before_cache:
- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
@@ -22,9 +19,10 @@ cache:
- $HOME/.gradle/wrapper/
before_install:
- - "export DISPLAY=:99.0"
- - "sh -e /etc/init.d/xvfb start"
- - sleep 3 # give xvfb some time to start
+
+services:
+ - xvfb
+ - mysql
before_script:
- mysql -uroot -e "create user testUser identified by 'testPassword';"
@@ -46,7 +44,7 @@ env:
- RUN_TEST_GROUP=coverage
jdk:
- - oraclejdk8
+ - openjdk8
matrix:
exclude:
diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
index 599d16f..fb70092 100644
--- a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
+++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
@@ -91,7 +91,7 @@ class TestMetastoreDatabaseServer implements Closeable {
this.dbHost,
this.dbPort);
- config = MysqldConfig.aMysqldConfig(Version.v5_6_latest)
+ config = MysqldConfig.aMysqldConfig(Version.v8_latest)
.withPort(this.dbPort)
.withUser(this.dbUserName, this.dbUserPassword)
.build();
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index 75c667b..c101d15 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Properties;
import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
@@ -34,22 +33,25 @@ import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.kafka.MockedHighLevelConsumer;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
@Test
+@Slf4j
public class HighLevelConsumerTest extends KafkaTestBase {
private static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
@@ -61,7 +63,6 @@ public class HighLevelConsumerTest extends KafkaTestBase {
private Closer _closer;
private String _kafkaBrokers;
- private AsyncDataWriter dataWriter;
public HighLevelConsumerTest()
throws InterruptedException, RuntimeException {
@@ -79,9 +80,9 @@ public class HighLevelConsumerTest extends KafkaTestBase {
producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, this.getZkConnectString());
producerProps.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(NUM_PARTITIONS));
- dataWriter = _closer.register(new Kafka09DataWriter(producerProps));
+ AsyncDataWriter<byte[]> dataWriter = _closer.register(new Kafka09DataWriter<byte[], byte[]>(producerProps));
- List<byte[]> records = createByteArrayMessages(NUM_MSGS);
+ List<byte[]> records = createByteArrayMessages();
WriteCallback mock = Mockito.mock(WriteCallback.class);
for(byte[] record : records) {
dataWriter.write(record, mock);
@@ -135,24 +136,20 @@ public class HighLevelConsumerTest extends KafkaTestBase {
NUM_PARTITIONS);
consumer.startAsync().awaitRunning();
- consumer.awaitExactlyNMessages(NUM_MSGS, 5000);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- }
+ consumer.awaitExactlyNMessages(NUM_MSGS, 10000);
- GobblinKafkaConsumerClient client = consumer.getGobblinKafkaConsumerClient();
for(int i=0; i< NUM_PARTITIONS; i++) {
KafkaPartition partition = new KafkaPartition.Builder().withTopicName(TOPIC).withId(i).build();
- Assert.assertTrue(consumer.getCommittedOffsets().containsKey(partition));
+ AssertWithBackoff.assertTrue(input -> consumer.getCommittedOffsets().containsKey(partition),
+ 5000, "waiting for committing offsets", log, 2, 1000);
}
consumer.shutDown();
}
- private List<byte[]> createByteArrayMessages(int numMsgs) {
+ private List<byte[]> createByteArrayMessages() {
List<byte[]> records = Lists.newArrayList();
- for(int i=0; i<numMsgs; i++) {
+ for(int i=0; i<NUM_MSGS; i++) {
byte[] msg = ("msg_" + i).getBytes();
records.add(msg);
}
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
index 26d3e3b..abbdc61 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -63,6 +63,8 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
private String _kafkaBrokers;
private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
+ String specUriString = "/foo/bar/spec";
+ Spec spec = initJobSpec(specUriString);
@BeforeSuite
public void beforeSuite() {
@@ -114,8 +116,6 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
// SEI Producer
_seip = _closer.register(new SimpleKafkaSpecProducer(config));
- String addedSpecUriString = "/foo/bar/addedSpec";
- Spec spec = initJobSpec(addedSpecUriString);
WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
log.info("WriteResponse: " + writeResponse);
@@ -131,15 +131,13 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
- Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match");
+ Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}
@Test (dependsOnMethods = "testAddSpec")
public void testUpdateSpec() throws Exception {
// update is only treated as an update for existing job specs
- String updatedSpecUriString = "/foo/bar/addedSpec";
- Spec spec = initJobSpec(updatedSpecUriString);
WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get();
log.info("WriteResponse: " + writeResponse);
@@ -148,15 +146,14 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.UPDATE), "Verb did not match");
- Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match");
+ Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}
@Test (dependsOnMethods = "testUpdateSpec")
public void testDeleteSpec() throws Exception {
// delete needs to be on a job spec that exists to get notification
- String deletedSpecUriString = "/foo/bar/addedSpec";
- WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get();
+ WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(specUriString)).get();
log.info("WriteResponse: " + writeResponse);
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
@@ -164,11 +161,11 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
- Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match");
+ Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}
- private JobSpec initJobSpec(String specUri) {
+ private static JobSpec initJobSpec(String specUri) {
Properties properties = new Properties();
return JobSpec.builder(specUri)
.withConfig(ConfigUtils.propertiesToConfig(properties))
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 98249aa..cce4c38 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -625,7 +625,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
* Subclasses can override this method to do whatever processing on the {@link TaskState}s,
* e.g., aggregate task-level metrics into job-level metrics.
*
- * @deprecated Use {@link #postProcessJobState(JobState)
+ * @deprecated Use {@link #postProcessJobState(JobState)}
*/
@Deprecated
protected void postProcessTaskStates(@SuppressWarnings("unused") List<TaskState> taskStates) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
index 8e16ca6..ea79cc4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_exec/JobLauncherExecutionDriver.java
@@ -334,7 +334,7 @@ public class JobLauncherExecutionDriver extends FutureTask<JobExecutionResult> i
* <p>Conventions
* <ul>
* <li>If no jobLauncherType is specified, one will be determined by the JobSpec
- * (see {@link JobLauncherFactory).
+ * (see {@link JobLauncherFactory}).
* <li> Convention for sysConfig: use the sysConfig of the gobblinInstance if specified,
* otherwise use empty config.
* <li> Convention for log: use gobblinInstance logger plus "." + jobSpec if specified, otherwise
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 657c18f..53a2b7e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.runtime.spec_store;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
-import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -30,6 +29,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
@@ -46,7 +46,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.api.SpecStore;
-
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -62,20 +62,28 @@ import org.apache.gobblin.runtime.api.SpecStore;
public class MysqlSpecStore implements SpecStore {
public static final String CONFIG_PREFIX = "mysqlSpecStore";
public static final String DEFAULT_TAG_VALUE = "";
+ private static final String NEW_COLUMN = "spec_json";
private static final String CREATE_TABLE_STATEMENT =
- "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, tag VARCHAR(128) NOT NULL, spec LONGBLOB, PRIMARY KEY (spec_uri))";
+ "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, flow_group VARCHAR(128), flow_name VARCHAR(128), "
+ + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), source_identifier VARCHAR(128), "
+ + "destination_identifier VARCHAR(128), schedule VARCHAR(128), tag VARCHAR(128) NOT NULL, "
+ + "modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ + "isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), "
+ + "spec LONGBLOB, " + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
- private static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
+ protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec, " + NEW_COLUMN + ") "
+ + "VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
- private static final String GET_STATEMENT = "SELECT spec FROM %s WHERE spec_uri = ?";
- private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
- private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri, spec FROM %s WHERE tag = ?";
+ private static final String GET_STATEMENT = "SELECT spec, " + NEW_COLUMN + " FROM %s WHERE spec_uri = ?";
+ private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
+ private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
+ private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri FROM %s WHERE tag = ?";
- private final DataSource dataSource;
- private final String tableName;
+ protected final DataSource dataSource;
+ protected final String tableName;
private final URI specStoreURI;
- private final SpecSerDe specSerDe;
+ protected final SpecSerDe specSerDe;
public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
if (config.hasPath(CONFIG_PREFIX)) {
@@ -117,15 +125,14 @@ public class MysqlSpecStore implements SpecStore {
/**
* Temporarily only used for testing since tag it not exposed in endpoint of {@link org.apache.gobblin.runtime.api.FlowSpec}
*/
- public void addSpec(Spec spec, String tagValue) throws IOException{
+ public void addSpec(Spec spec, String tagValue) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
-
statement.setString(1, spec.getUri().toString());
statement.setString(2, tagValue);
statement.setBlob(3, new ByteArrayInputStream(this.specSerDe.serialize(spec)));
+ statement.setString(4, new String(this.specSerDe.serialize(spec), Charsets.UTF_8));
statement.executeUpdate();
-
connection.commit();
} catch (SQLException | SpecSerDeException e) {
throw new IOException(e);
@@ -156,6 +163,7 @@ public class MysqlSpecStore implements SpecStore {
}
@Override
+ // TODO : this method is not doing what the contract is in the SpecStore interface
public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
addSpec(spec);
return spec;
@@ -165,16 +173,15 @@ public class MysqlSpecStore implements SpecStore {
public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(GET_STATEMENT, this.tableName))) {
-
statement.setString(1, specUri.toString());
try (ResultSet rs = statement.executeQuery()) {
if (!rs.next()) {
throw new SpecNotFoundException(specUri);
}
-
- Blob blob = rs.getBlob(1);
- return this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream()));
+ return rs.getString(2) == null
+ ? this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(1).getBinaryStream()))
+ : this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8));
}
} catch (SQLException | SpecSerDeException e) {
throw new IOException(e);
@@ -195,14 +202,16 @@ public class MysqlSpecStore implements SpecStore {
public Collection<Spec> getSpecs() throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
-
List<Spec> specs = new ArrayList<>();
try (ResultSet rs = statement.executeQuery()) {
while (rs.next()) {
try {
- Blob blob = rs.getBlob(2);
- specs.add(this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream())));
+ specs.add(
+ rs.getString(3) == null
+ ? this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8))
+ : this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(3).getBinaryStream()))
+ );
} catch (SQLException | SpecSerDeException e) {
log.error("Failed to deserialize spec", e);
}
@@ -218,7 +227,7 @@ public class MysqlSpecStore implements SpecStore {
@Override
public Iterator<URI> getSpecURIs() throws IOException {
try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
+ PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_URIS_STATEMENT, this.tableName))) {
return getURIIteratorByQuery(statement);
} catch (SQLException e) {
throw new IOException(e);
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 10357b4..16e08b8 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -17,14 +17,18 @@
package org.apache.gobblin.runtime.spec_store;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.net.URI;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.SerializationUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -38,8 +42,9 @@ import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecSerDeException;
-import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
+import org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe;
public class MysqlSpecStoreTest {
@@ -48,6 +53,7 @@ public class MysqlSpecStoreTest {
private static final String TABLE = "spec_store";
private MysqlSpecStore specStore;
+ private MysqlSpecStore oldSpecStore;
private URI uri1 = URI.create("flowspec1");
private URI uri2 = URI.create("flowspec2");
private URI uri3 = URI.create("flowspec3");
@@ -79,6 +85,7 @@ public class MysqlSpecStoreTest {
.build();
this.specStore = new MysqlSpecStore(config, new TestSpecSerDe());
+ this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
}
@Test
@@ -91,11 +98,8 @@ public class MysqlSpecStoreTest {
Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
}
- @Test
+ @Test (dependsOnMethods = "testAddSpec")
public void testGetSpec() throws Exception {
- this.specStore.addSpec(this.flowSpec1);
- this.specStore.addSpec(this.flowSpec2);
-
FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
Assert.assertEquals(result, this.flowSpec1);
@@ -126,43 +130,65 @@ public class MysqlSpecStoreTest {
.withVersion("Test version 5")
.build();
- this.specStore.addSpec(flowSpec3);
this.specStore.addSpec(flowSpec4, "dr");
this.specStore.addSpec(flowSpec5, "dr");
- Assert.assertTrue(this.specStore.exists(uri3));
Assert.assertTrue(this.specStore.exists(uri4));
Assert.assertTrue(this.specStore.exists(uri5));
- List<URI> result = new ArrayList();
+ List<URI> result = new ArrayList<>();
this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
Assert.assertEquals(result.size(), 2);
}
- @Test
+ @Test (expectedExceptions = {IOException.class})
public void testGetCorruptedSpec() throws Exception {
- this.specStore.addSpec(this.flowSpec1);
- this.specStore.addSpec(this.flowSpec2);
this.specStore.addSpec(this.flowSpec3);
-
- Collection<Spec> specs = this.specStore.getSpecs();
- Assert.assertTrue(specs.contains(this.flowSpec1));
- Assert.assertTrue(specs.contains(this.flowSpec2));
- Assert.assertFalse(specs.contains(this.flowSpec3));
}
- @Test
+ @Test (dependsOnMethods = "testGetSpec")
public void testDeleteSpec() throws Exception {
- this.specStore.addSpec(this.flowSpec1);
- Assert.assertTrue(this.specStore.exists(this.uri1));
-
this.specStore.deleteSpec(this.uri1);
Assert.assertFalse(this.specStore.exists(this.uri1));
}
- public class TestSpecSerDe extends JavaSpecSerDe {
+ @Test (dependsOnMethods = "testDeleteSpec")
+ public void testReadOldColumn() throws Exception {
+ this.oldSpecStore.addSpec(this.flowSpec1);
+
+ FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri1);
+ Assert.assertEquals(spec, this.flowSpec1);
+ }
+
+ /**
+ * A {@link MysqlSpecStore} which does not write into the new spec_json column
+ * to simulate behavior of a table with old data.
+ */
+ public static class OldSpecStore extends MysqlSpecStore {
+
+ public OldSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
+ super(config, specSerDe);
+ }
+
+ @Override
+ public void addSpec(Spec spec, String tagValue) throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
+ statement.setString(1, spec.getUri().toString());
+ statement.setString(2, tagValue);
+ statement.setBlob(3, new ByteArrayInputStream(this.specSerDe.serialize(spec)));
+ statement.setString(4, null);
+ statement.executeUpdate();
+ connection.commit();
+ } catch (SQLException | SpecSerDeException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public class TestSpecSerDe extends GsonFlowSpecSerDe {
@Override
public byte[] serialize(Spec spec) throws SpecSerDeException {
- byte[] bytes = SerializationUtils.serialize(spec);
+ byte[] bytes = super.serialize(spec);
// Reverse bytes to simulate corrupted Spec
if (spec.getUri().equals(uri3)) {
ArrayUtils.reverse(bytes);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 1ab3b24..a6546e2 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -106,7 +106,7 @@ public abstract class JobStatusRetrieverTest {
jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId);
Assert.assertTrue(jobStatusIterator.hasNext());
jobStatus = jobStatusIterator.next();
- if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ if (JobStatusRetriever.isFlowStatus(jobStatus)) {
jobStatus = jobStatusIterator.next();
}
Assert.assertTrue(jobStatus.getJobName().equals(MY_JOB_NAME_1) || jobStatus.getJobName().equals(MY_JOB_NAME_2));
@@ -115,6 +115,11 @@ public abstract class JobStatusRetrieverTest {
String nextExpectedJobName = (MY_JOB_NAME_1.equals(jobName)) ? MY_JOB_NAME_2 : MY_JOB_NAME_1;
Assert.assertTrue(jobStatusIterator.hasNext());
jobStatus = jobStatusIterator.next();
+ if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+ Assert.assertTrue(jobStatusIterator.hasNext());
+ jobStatus = jobStatusIterator.next();
+ }
+
Assert.assertEquals(jobStatus.getJobName(), nextExpectedJobName);
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
index 026bba2..33b9e09 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
@@ -244,7 +244,7 @@ public class ConfigUtils {
!blacklistedKeys.contains(entryKey)) {
if (fullPrefixKeys.contains(entryKey)) {
entryKey = sanitizeFullPrefixKey(entryKey);
- } else if (entryKey.endsWith(STRIP_SUFFIX)) {
+ } else if (sanitizedKey(entryKey)) {
throw new RuntimeException("Properties are not allowed to end in " + STRIP_SUFFIX);
}
immutableMapBuilder.put(entryKey, entry.getValue());
@@ -257,8 +257,15 @@ public class ConfigUtils {
return propKey + STRIP_SUFFIX;
}
+ /**
+ * returns true if is it a sanitized key
+ */
+ public static boolean sanitizedKey(String propKey) {
+ return propKey.endsWith(STRIP_SUFFIX);
+ }
+
public static String desanitizeKey(String propKey) {
- propKey = propKey.endsWith(STRIP_SUFFIX) ?
+ propKey = sanitizedKey(propKey) ?
propKey.substring(0, propKey.length() - STRIP_SUFFIX.length()) : propKey;
// Also strip quotes that can get introduced by TypeSafe.Config
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
index a4f3562..144556c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbacksDispatcher.java
@@ -44,7 +44,7 @@ import lombok.Data;
/**
* A helper to dispatch callbacks to a set of listeners. The CallbacksDispatcher is responsible for
* managing the list if listeners which implement a common interface L. Invocation happens through
- * the {@link #execCallbacks(CallbackFactory) method.
+ * the {@link #execCallbacks(CallbackFactory)} method.
*
* @param L the listener type; it is strongly advised that the class implements toString() to
* provide useful logging
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index cf6f515..67435a6 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -157,7 +157,7 @@ ext.externalDependency = [
"jsonAssert": "org.skyscreamer:jsonassert:1.3.0",
"reflections" : "org.reflections:reflections:0.9.10",
"embeddedProcess": "de.flapdoodle.embed:de.flapdoodle.embed.process:1.50.2",
- "testMysqlServer": "com.wix:wix-embedded-mysql:4.2.0",
+ "testMysqlServer": "com.wix:wix-embedded-mysql:4.6.1",
"flyway": "org.flywaydb:flyway-core:3.2.1",
"oltu": "org.apache.oltu.oauth2:org.apache.oltu.oauth2.client:1.0.2",
"googleAnalytics": "com.google.apis:google-api-services-analytics:v3-rev134-1.22.0",
diff --git a/gradle/scripts/javaVersionCheck.gradle b/gradle/scripts/javaVersionCheck.gradle
index 5159be2..8a9e59f 100644
--- a/gradle/scripts/javaVersionCheck.gradle
+++ b/gradle/scripts/javaVersionCheck.gradle
@@ -2,7 +2,7 @@ gradle.taskGraph.whenReady {
gradle.taskGraph.allTasks.each { task ->
def taskProject = task.project
if (taskProject.hasProperty('requiresJavaVersion') && !taskProject.requiresJavaVersion.equals(javaVersion)) {
- logger.warn("WARNING: Project {} requres Java version {} which conflicts with build version {}. COMPILATION DISABLED. Please use -PjdkVersion={} .",
+ logger.warn("WARNING: Project {} requires Java version {} which conflicts with build version {}. COMPILATION DISABLED. Please use -PjdkVersion={} .",
taskProject.name,
taskProject.requiresJavaVersion,
javaVersion,
diff --git a/travis/test.sh b/travis/test.sh
index 576d9bd..261da9c 100755
--- a/travis/test.sh
+++ b/travis/test.sh
@@ -30,7 +30,7 @@ RUN_TEST_GROUP=${RUN_TEST_GROUP:-default}
script_dir=$(dirname $0)
echo "Old GRADLE_OPTS=$GRADLE_OPTS"
-export java_version=$(java -version 2>&1 | grep 'java version' | sed -e 's/java version "\(1\..\).*/\1/')
+export java_version=$(java -version 2>&1 | grep 'openjdk version' | sed -e 's/openjdk version "\(1\..\).*/\1/')
echo "Using Java version:${java_version}"