You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/10/25 20:26:19 UTC
[gobblin] branch master updated: Refactor `MysqlSpecStore` into a
generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s),
also useable for `TopologySpec`s (#3414)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 255bdc1 Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (#3414)
255bdc1 is described below
commit 255bdc15f4e8dd7dbf5fb94e9742d8430f2506f3
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Mon Oct 25 13:26:11 2021 -0700
Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (#3414)
* Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s
* Add missing file, `MysqlNonFlowSpecStoreTest`
* Fixup `MysqlNonFlowSpecStoreTest`
* Simplify implementaiton of `MysqlSpecStore.getSpecsImpl`.
* Rename `MysqlNonFlowSpecStore` to `MysqlBaseFlowSpecStore`.
* Aid maintainers with additional code comments
---
.../gobblin/runtime/api/FlowSpecSearchObject.java | 141 ++++++
.../gobblin/runtime/api/InstrumentedSpecStore.java | 8 +
.../gobblin/runtime/api/SpecSearchObject.java | 11 +
.../gobblin/runtime/spec_store/FSSpecStore.java | 2 +-
.../runtime/spec_store/MysqlBaseSpecStore.java | 318 ++++++++++++++
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 473 ++++-----------------
.../runtime/spec_store/MysqlBaseSpecStoreTest.java | 159 +++++++
.../runtime/spec_store/MysqlSpecStoreTest.java | 8 +-
8 files changed, 728 insertions(+), 392 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
index ef6737c..644d5f4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
@@ -17,12 +17,21 @@
package org.apache.gobblin.runtime.api;
+import java.io.IOException;
import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Splitter;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.FlowId;
@@ -34,6 +43,7 @@ import org.apache.gobblin.service.FlowId;
@Builder
@ToString
@AllArgsConstructor
+@Slf4j
public class FlowSpecSearchObject implements SpecSearchObject {
private final URI flowSpecUri;
private final String flowGroup;
@@ -51,4 +61,135 @@ public class FlowSpecSearchObject implements SpecSearchObject {
public static FlowSpecSearchObject fromFlowId(FlowId flowId) {
return FlowSpecSearchObject.builder().flowGroup(flowId.getFlowGroup()).flowName(flowId.getFlowName()).build();
}
+
+ /** This expects at least one parameter of `this` to be not null */
+ @Override
+ public String augmentBaseGetStatement(String baseStatement)
+ throws IOException {
+ List<String> conditions = new ArrayList<>();
+
+ /*
+ * IMPORTANT: the order of `conditions` added must align with the order of parameter binding later in `completePreparedStatement`!
+ */
+
+ if (this.getFlowSpecUri() != null) {
+ conditions.add("spec_uri = ?");
+ }
+
+ if (this.getFlowGroup() != null) {
+ conditions.add("flow_group = ?");
+ }
+
+ if (this.getFlowName() != null) {
+ conditions.add("flow_name = ?");
+ }
+
+ if (this.getTemplateURI() != null) {
+ conditions.add("template_uri = ?");
+ }
+
+ if (this.getUserToProxy() != null) {
+ conditions.add("user_to_proxy = ?");
+ }
+
+ if (this.getSourceIdentifier() != null) {
+ conditions.add("source_identifier = ?");
+ }
+
+ if (this.getDestinationIdentifier() != null) {
+ conditions.add("destination_identifier = ?");
+ }
+
+ if (this.getSchedule() != null) {
+ conditions.add("schedule = ?");
+ }
+
+ if (this.getModifiedTimestamp() != null) {
+ conditions.add("modified_time = ?");
+ }
+
+ if (this.getIsRunImmediately() != null) {
+ conditions.add("isRunImmediately = ?");
+ }
+
+ if (this.getOwningGroup() != null) {
+ conditions.add("owning_group = ?");
+ }
+
+ // If the propertyFilter is myKey=myValue, it looks for a config where key is `myKey` and value contains string `myValue`.
+ // If the propertyFilter string does not have `=`, it considers the string as a key and just looks for its existence.
+ // Multiple occurrences of `=` in propertyFilter are not supported and ignored completely.
+ if (this.getPropertyFilter() != null) {
+ String propertyFilter = this.getPropertyFilter();
+ Splitter commaSplitter = Splitter.on(",").trimResults().omitEmptyStrings();
+ for (String property : commaSplitter.splitToList(propertyFilter)) {
+ if (property.contains("=")) {
+ String[] keyValue = property.split("=");
+ if (keyValue.length != 2) {
+ log.error("Incorrect flow config search query");
+ continue;
+ }
+ conditions.add("spec_json->'$.configAsProperties.\"" + keyValue[0] + "\"' like " + "'%" + keyValue[1] + "%'");
+ } else {
+ conditions.add("spec_json->'$.configAsProperties.\"" + property + "\"' is not null");
+ }
+ }
+ }
+
+ if (conditions.size() == 0) {
+ throw new IOException("At least one condition is required to query flow configs.");
+ }
+
+ return baseStatement + String.join(" AND ", conditions);
+ }
+
+ @Override
+ public void completePreparedStatement(PreparedStatement statement)
+ throws SQLException {
+ int i = 0;
+
+ /*
+ * IMPORTANT: the order of binding params must align with the order of building the conditions earlier in `augmentBaseGetStatement`!
+ */
+
+ if (this.getFlowSpecUri() != null) {
+ statement.setString(++i, this.getFlowSpecUri().toString());
+ }
+
+ if (this.getFlowGroup() != null) {
+ statement.setString(++i, this.getFlowGroup());
+ }
+
+ if (this.getFlowName() != null) {
+ statement.setString(++i, this.getFlowName());
+ }
+
+ if (this.getTemplateURI() != null) {
+ statement.setString(++i, this.getTemplateURI());
+ }
+
+ if (this.getUserToProxy() != null) {
+ statement.setString(++i, this.getUserToProxy());
+ }
+
+ if (this.getSourceIdentifier() != null) {
+ statement.setString(++i, this.getSourceIdentifier());
+ }
+
+ if (this.getDestinationIdentifier() != null) {
+ statement.setString(++i, this.getDestinationIdentifier());
+ }
+
+ if (this.getSchedule() != null) {
+ statement.setString(++i, this.getModifiedTimestamp());
+ }
+
+ if (this.getIsRunImmediately() != null) {
+ statement.setBoolean(++i, this.getIsRunImmediately());
+ }
+
+ if (this.getOwningGroup() != null) {
+ statement.setString(++i, this.getOwningGroup());
+ }
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index 901ef88..1f2be84 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -91,6 +91,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
private OptionallyTimingInvoker getAllTimer;
private OptionallyTimingInvoker getSizeTimer;
private OptionallyTimingInvoker getURIsTimer;
+ private OptionallyTimingInvoker getURIsWithTagTimer;
private MetricContext metricContext;
private final boolean instrumentationEnabled;
@@ -105,6 +106,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
this.getAllTimer = createTimingInvoker("-GETALL");
this.getSizeTimer = createTimingInvoker("-GETCOUNT");
this.getURIsTimer = createTimingInvoker("-GETURIS");
+ this.getURIsWithTagTimer = createTimingInvoker("-GETURISWITHTAG");
}
private OptionallyTimingInvoker createTimingInvoker(String suffix) {
@@ -159,6 +161,11 @@ public abstract class InstrumentedSpecStore implements SpecStore {
}
@Override
+ public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
+ return this.getURIsWithTagTimer.invokeMayThrowIO(() -> getSpecURIsWithTagImpl(tag));
+ }
+
+ @Override
public int getSize() throws IOException {
return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
}
@@ -170,6 +177,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
public abstract boolean deleteSpecImpl(URI specUri) throws IOException;
public abstract Collection<Spec> getSpecsImpl() throws IOException;
public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
+ public abstract Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException;
public abstract int getSizeImpl() throws IOException;
/** child classes can implement this if they want to get specs using {@link SpecSearchObject} */
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
index febf4c9..8d756dc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
@@ -17,8 +17,19 @@
package org.apache.gobblin.runtime.api;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+
/**
* This is an interface to package all the parameters that should be used to search {@link Spec} in a {@link SpecStore}
*/
public interface SpecSearchObject {
+
+ /** @returns `baseStatement`, further constrained by the search object's restrictions (e.g. through (unbound) `WHERE` clause conditions) */
+ String augmentBaseGetStatement(String baseStatement) throws IOException;
+
+ /** Bind all placeholders in `statement`, which must have been prepared from the result of {@link this.augmentBaseGetStatment()} */
+ public void completePreparedStatement(PreparedStatement statement) throws SQLException;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 63e829a..921138f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -268,7 +268,7 @@ public class FSSpecStore extends InstrumentedSpecStore {
}
@Override
- public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
+ public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
throw new UnsupportedOperationException("Loading specs with tag is not supported in FS-Implementation of SpecStore");
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
new file mode 100644
index 0000000..c328a2f
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+import org.apache.gobblin.runtime.api.SpecStore;
+
+
+/**
+ * Implementation of {@link SpecStore} that stores specs in MySQL as a serialized BLOB, per the provided {@link SpecSerDe}.
+ * Note: versions are unsupported, so the version parameter is ignored in methods that have it.
+ *
+ * A tag column is added into implementation to serve certain filtering purposes in MySQL-based SpecStore.
+ * For example, in DR mode of GaaS, we would only want certain {@link Spec}s to be eligible for orchestrated
+ * by alternative GaaS instances. Another example is allow-listing/deny-listing {@link Spec}s temporarily
+ * but not removing it from {@link SpecStore}.
+ *
+ * The {@link MysqlSpecStore} is a specialization enhanced for {@link FlowSpec} search and retrieval.
+ */
+@Slf4j
+public class MysqlBaseSpecStore extends InstrumentedSpecStore {
+
+ /** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "mysqlBaseSpecStore";
+ public static final String DEFAULT_TAG_VALUE = "";
+
+ private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
+ protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec) "
+ + "VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
+ private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
+ private static final String GET_STATEMENT_BASE = "SELECT spec_uri, spec FROM %s WHERE ";
+ private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
+ private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
+ private static final String GET_ALL_URIS_WITH_TAG_STATEMENT = "SELECT spec_uri FROM %s WHERE tag = ?";
+ private static final String GET_SIZE_STATEMENT = "SELECT COUNT(*) FROM %s ";
+ // NOTE: using max length of a `FlowSpec` URI, as it's believed to be the longest of existing `Spec` types
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(" + FlowSpec.Utils.maxFlowSpecUriLength()
+ + ") NOT NULL, tag VARCHAR(128) NOT NULL, modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE "
+ + "CURRENT_TIMESTAMP, spec LONGBLOB, PRIMARY KEY (spec_uri))";
+
+ /**
+ * The main point of difference with base classes is the DB schema and hence certain of the DML and queries. Given the interrelation
+ * between statements, collect them within this inner class that enables selective, per-statement override, and delivers them as a unit.
+ */
+ protected class SqlStatements {
+ public final String existsStatement = String.format(getTablelessExistsStatement(), MysqlBaseSpecStore.this.tableName);
+ public final String insertStatement = String.format(getTablelessInsertStatement(), MysqlBaseSpecStore.this.tableName);
+ public final String deleteStatement = String.format(getTablelessDeleteStatement(), MysqlBaseSpecStore.this.tableName);
+ public final String getStatementBase = String.format(getTablelessGetStatementBase(), MysqlBaseSpecStore.this.tableName);
+ public final String getAllStatement = String.format(getTablelessGetAllStatement(), MysqlBaseSpecStore.this.tableName);
+ public final String getAllURIsStatement = String.format(getTablelessGetAllURIsStatement(), MysqlBaseSpecStore.this.tableName);
+ public final String getAllURIsWithTagStatement = String.format(getTablelessGetAllURIsWithTagStatement(), MysqlBaseSpecStore.this.tableName);
+ public final String getSizeStatement = String.format(getTablelessGetSizeStatement(), MysqlBaseSpecStore.this.tableName);
+ public final String createTableStatement = String.format(getTablelessCreateTableStatement(), MysqlBaseSpecStore.this.tableName);
+
+ public void completeInsertPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
+ URI specUri = spec.getUri();
+
+ int i = 0;
+ statement.setString(++i, specUri.toString());
+ statement.setString(++i, tagValue);
+ statement.setBlob(++i, new ByteArrayInputStream(MysqlBaseSpecStore.this.specSerDe.serialize(spec)));
+ }
+
+ public Spec extractSpec(ResultSet rs) throws SQLException, IOException {
+ return MysqlBaseSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()));
+ }
+
+ protected String getTablelessExistsStatement() { return MysqlBaseSpecStore.EXISTS_STATEMENT; }
+ protected String getTablelessInsertStatement() { return MysqlBaseSpecStore.INSERT_STATEMENT; }
+ protected String getTablelessDeleteStatement() { return MysqlBaseSpecStore.DELETE_STATEMENT; }
+ protected String getTablelessGetStatementBase() { return MysqlBaseSpecStore.GET_STATEMENT_BASE; }
+ protected String getTablelessGetAllStatement() { return MysqlBaseSpecStore.GET_ALL_STATEMENT; }
+ protected String getTablelessGetAllURIsStatement() { return MysqlBaseSpecStore.GET_ALL_URIS_STATEMENT; }
+ protected String getTablelessGetAllURIsWithTagStatement() { return MysqlBaseSpecStore.GET_ALL_URIS_WITH_TAG_STATEMENT; }
+ protected String getTablelessGetSizeStatement() { return MysqlBaseSpecStore.GET_SIZE_STATEMENT; }
+ protected String getTablelessCreateTableStatement() { return MysqlBaseSpecStore.CREATE_TABLE_STATEMENT; }
+ }
+
+
+ protected final DataSource dataSource;
+ protected final String tableName;
+ private final URI specStoreURI;
+ protected final SpecSerDe specSerDe;
+ protected final SqlStatements sqlStatements;
+
+ public MysqlBaseSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
+ super(config, specSerDe);
+ String configPrefix = getConfigPrefix();
+ if (config.hasPath(configPrefix)) {
+ config = config.getConfig(configPrefix).withFallback(config);
+ }
+
+ this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
+ this.tableName = config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY);
+ this.specStoreURI = URI.create(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
+ this.specSerDe = specSerDe;
+ this.sqlStatements = createSqlStatements();
+
+ withPreparedStatement(this.sqlStatements.createTableStatement, statement ->
+ statement.executeUpdate()
+ );
+ }
+
+ protected String getConfigPrefix() {
+ return MysqlBaseSpecStore.CONFIG_PREFIX;
+ }
+
+ protected SqlStatements createSqlStatements() {
+ return new SqlStatements();
+ }
+
+ @Override
+ public boolean existsImpl(URI specUri) throws IOException {
+ return withPreparedStatement(this.sqlStatements.existsStatement, statement -> {
+ statement.setString(1, specUri.toString());
+ try (ResultSet rs = statement.executeQuery()) {
+ rs.next();
+ return rs.getBoolean(1);
+ }
+ });
+ }
+
+ @Override
+ public void addSpecImpl(Spec spec) throws IOException {
+ this.addSpec(spec, DEFAULT_TAG_VALUE);
+ }
+
+ /**
+ * Temporarily only used for testing since tag it not exposed in endpoint of {@link org.apache.gobblin.runtime.api.FlowSpec}
+ */
+ public void addSpec(Spec spec, String tagValue) throws IOException {
+ withPreparedStatement(this.sqlStatements.insertStatement, statement -> {
+ this.sqlStatements.completeInsertPreparedStatement(statement, spec, tagValue);
+ statement.executeUpdate();
+ return null; // (type: `Void`)
+ }, true);
+ }
+
+ @Override
+ public boolean deleteSpec(Spec spec) throws IOException {
+ return deleteSpec(spec.getUri());
+ }
+
+ @Override
+ public boolean deleteSpecImpl(URI specUri) throws IOException {
+ return withPreparedStatement(this.sqlStatements.deleteStatement, statement -> {
+ statement.setString(1, specUri.toString());
+ int result = statement.executeUpdate();
+ return result != 0;
+ }, true);
+ }
+
+ @Override
+ public boolean deleteSpec(URI specUri, String version) throws IOException {
+ return deleteSpec(specUri);
+ }
+
+ @Override
+ // TODO: fix to obey the `SpecStore` contract of returning the *updated* `Spec`
+ public Spec updateSpecImpl(Spec spec) throws IOException {
+ addSpec(spec);
+ return spec;
+ }
+
+ @Override
+ public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
+ Iterator<Spec> resultSpecs = withPreparedStatement(this.sqlStatements.getAllStatement + " WHERE spec_uri = ?", statement -> {
+ statement.setString(1, specUri.toString());
+ return retrieveSpecs(statement).iterator();
+ });
+ if (resultSpecs.hasNext()) {
+ return resultSpecs.next();
+ } else {
+ throw new SpecNotFoundException(specUri);
+ }
+ }
+
+ @Override
+ public Spec getSpec(URI specUri, String version) throws IOException, SpecNotFoundException {
+ return getSpec(specUri); // `version` ignored, as mentioned in javadoc
+ }
+
+ @Override
+ public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException, SpecNotFoundException {
+ return Lists.newArrayList(getSpec(specUri));
+ }
+
+ @Override
+ public Collection<Spec> getSpecsImpl() throws IOException {
+ return withPreparedStatement(this.sqlStatements.getAllStatement, statement -> {
+ return retrieveSpecs(statement);
+ });
+ }
+
+ protected final Collection<Spec> retrieveSpecs(PreparedStatement statement) throws IOException {
+ List<Spec> specs = new ArrayList<>();
+ try (ResultSet rs = statement.executeQuery()) {
+ while (rs.next()) {
+ specs.add(this.sqlStatements.extractSpec(rs));
+ }
+ } catch (SQLException | SpecSerDeException e) {
+ log.error("Failed to deserialize spec", e);
+ throw new IOException(e);
+ }
+ return specs;
+ }
+
+ @Override
+ public Iterator<URI> getSpecURIsImpl() throws IOException {
+ return withPreparedStatement(this.sqlStatements.getAllURIsStatement, statement -> {
+ return retreiveURIs(statement).iterator();
+ });
+ }
+
+ @Override
+ public int getSizeImpl() throws IOException {
+ return withPreparedStatement(this.sqlStatements.getSizeStatement, statement -> {
+ try (ResultSet resultSet = statement.executeQuery()) {
+ resultSet.next();
+ return resultSet.getInt(1);
+ }
+ });
+ }
+
+ @Override
+ public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
+ return withPreparedStatement(this.sqlStatements.getAllURIsWithTagStatement, statement -> {
+ statement.setString(1, tag);
+ return retreiveURIs(statement).iterator();
+ });
+ }
+
+ private List<URI> retreiveURIs(PreparedStatement statement) throws SQLException {
+ List<URI> uris = new ArrayList<>();
+
+ try (ResultSet rs = statement.executeQuery()) {
+ while (rs.next()) {
+ URI specURI = URI.create(rs.getString(1));
+ uris.add(specURI);
+ }
+ }
+
+ return uris;
+ }
+
+ @Override
+ public Optional<URI> getSpecStoreURI() {
+ return Optional.of(this.specStoreURI);
+ }
+
+ /** Abstracts recurring pattern around resource management and exception re-mapping. */
+ protected <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(sql)) {
+ T result = f.apply(statement);
+ if (shouldCommit) {
+ connection.commit();
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** Abstracts recurring pattern, while not presuming to DB `commit()`. */
+ protected final <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatement, T> f) throws IOException {
+ return withPreparedStatement(sql, f, false);
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 032da32..00e203d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -17,431 +17,130 @@
package org.apache.gobblin.runtime.spec_store;
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteStreams;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
-import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
-
-import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
-import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSearchObject;
import org.apache.gobblin.runtime.api.SpecSerDe;
-import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
-import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
-import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
+import static org.apache.gobblin.service.ServiceConfigKeys.*;
/**
- * Implementation of {@link SpecStore} that stores specs as serialized java objects in MySQL. Note that versions are not
- * supported, so the version parameter will be ignored in methods that have it.
+ * Implementation of {@link SpecStore} that stores specs in MySQL both as a serialized BLOB and as a JSON string, and extends
+ * {@link MysqlBaseSpecStore} with enhanced FlowSpec search/retrieval capabilities. The {@link SpecSerDe}'s serialized output
+ * is presumed suitable for a MySql `JSON` column. As in the base, versions are unsupported and ignored.
*
- * A tag column is added into implementation to serve certain filtering purposes in MySQL-based SpecStore.
- * For example, in DR mode of GaaS, we would only want certain {@link Spec}s to be eligible for orchestrated
- * by alternative GaaS instances. Another example is whitelisting/blacklisting {@link Spec}s temporarily
- * but not removing it from {@link SpecStore}.
+ * ETYMOLOGY: this class might better be named `MysqlFlowSpecStore` while the base be `MysqlSpecStore`, but the `MysqlSpecStore` name's
+ * association with this class's semantics predates the refactoring/generalization into `MysqlBaseSpecStore`. Thus, to maintain
+ * b/w-compatibility and avoid surprising legacy users who already refer to it (e.g. by name, in configuration), that original name remains intact.
*/
@Slf4j
-// todo : This should be renamed to MysqlFlowSpecStore, because this implementation only stores FlowSpec, not a TopologySpec
-public class MysqlSpecStore extends InstrumentedSpecStore {
+public class MysqlSpecStore extends MysqlBaseSpecStore {
public static final String CONFIG_PREFIX = "mysqlSpecStore";
- public static final String DEFAULT_TAG_VALUE = "";
- private static final String NEW_COLUMN = "spec_json";
- private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
- protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
- + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, " + NEW_COLUMN + ") "
- + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
- private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
- private static final String GET_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s WHERE ";
- private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
- private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
- private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri FROM %s WHERE tag = ?";
- private static final String GET_SIZE_STATEMENT = "SELECT COUNT(*) FROM %s ";
+ // Historical Note: the `spec_json` column didn't always exist and was introduced for GOBBLIN-1150; the impl. thus allows that not every
+ // record may contain data there... though in practice, all should, given the passage of time (amidst the usual retention expiry).
+ protected static final String SPECIFIC_INSERT_STATEMENT = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
+ + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, spec_json) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), spec_json = VALUES(spec_json)";
+ private static final String SPECIFIC_GET_STATEMENT_BASE = "SELECT spec_uri, spec, spec_json FROM %s WHERE ";
+ private static final String SPECIFIC_GET_ALL_STATEMENT = "SELECT spec_uri, spec, spec_json FROM %s";
+ private static final String SPECIFIC_CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR("
+ + FlowSpec.Utils.maxFlowSpecUriLength()
+ + ") NOT NULL, flow_group VARCHAR(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), flow_name VARCHAR("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), " + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), "
+ + "source_identifier VARCHAR(128), " + "destination_identifier VARCHAR(128), schedule VARCHAR(128), "
+ + "tag VARCHAR(128) NOT NULL, modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE "
+ + "CURRENT_TIMESTAMP, isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), spec LONGBLOB, "
+ + "spec_json JSON, PRIMARY KEY (spec_uri))";
+
+ /** Bundle all changes following from schema differences against the base class. */
+ protected class SpecificSqlStatements extends SqlStatements {
+ @Override
+ public void completeInsertPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ URI specUri = flowSpec.getUri();
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String templateURI = new Gson().toJson(flowSpec.getTemplateURIs());
+ String userToProxy = ConfigUtils.getString(flowSpec.getConfig(), "user.to.proxy", null);
+ String sourceIdentifier = flowConfig.getString(FLOW_SOURCE_IDENTIFIER_KEY);
+ String destinationIdentifier = flowConfig.getString(FLOW_DESTINATION_IDENTIFIER_KEY);
+ String schedule = ConfigUtils.getString(flowConfig, ConfigurationKeys.JOB_SCHEDULE_KEY, null);
+ String owningGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_OWNING_GROUP_KEY, null);
+ boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
+
+ int i = 0;
+ statement.setString(++i, specUri.toString());
+ statement.setString(++i, flowGroup);
+ statement.setString(++i, flowName);
+ statement.setString(++i, templateURI);
+ statement.setString(++i, userToProxy);
+ statement.setString(++i, sourceIdentifier);
+ statement.setString(++i, destinationIdentifier);
+ statement.setString(++i, schedule);
+ statement.setString(++i, tagValue);
+ statement.setBoolean(++i, isRunImmediately);
+ statement.setString(++i, owningGroup);
+ statement.setBlob(++i, new ByteArrayInputStream(MysqlSpecStore.this.specSerDe.serialize(flowSpec)));
+ statement.setString(++i, new String(MysqlSpecStore.this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
+ }
+
+ @Override
+ public Spec extractSpec(ResultSet rs) throws SQLException, IOException {
+ return rs.getString(3) == null
+ ? MysqlSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()))
+ : MysqlSpecStore.this.specSerDe.deserialize(rs.getString(3).getBytes(Charsets.UTF_8));
+ }
+
+ @Override
+ protected String getTablelessInsertStatement() { return MysqlSpecStore.SPECIFIC_INSERT_STATEMENT; }
+ @Override
+ protected String getTablelessGetStatementBase() { return MysqlSpecStore.SPECIFIC_GET_STATEMENT_BASE; }
+ @Override
+ protected String getTablelessGetAllStatement() { return MysqlSpecStore.SPECIFIC_GET_ALL_STATEMENT; }
+ @Override
+ protected String getTablelessCreateTableStatement() { return MysqlSpecStore.SPECIFIC_CREATE_TABLE_STATEMENT; }
+ }
- protected final DataSource dataSource;
- protected final String tableName;
- private final URI specStoreURI;
- protected final SpecSerDe specSerDe;
public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
super(config, specSerDe);
- if (config.hasPath(CONFIG_PREFIX)) {
- config = config.getConfig(CONFIG_PREFIX).withFallback(config);
- }
-
- this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
- this.tableName = config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY);
- this.specStoreURI = URI.create(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
- this.specSerDe = specSerDe;
-
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(getCreateJobStateTableTemplate())) {
- statement.executeUpdate();
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- protected String getCreateJobStateTableTemplate() {
- return "CREATE TABLE IF NOT EXISTS " + this.tableName + " (spec_uri VARCHAR(" + FlowSpec.Utils.maxFlowSpecUriLength()
- + ") NOT NULL, flow_group VARCHAR(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), flow_name VARCHAR("
- + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), " + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), "
- + "source_identifier VARCHAR(128), " + "destination_identifier VARCHAR(128), schedule VARCHAR(128), "
- + "tag VARCHAR(128) NOT NULL, modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE "
- + "CURRENT_TIMESTAMP, isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), spec LONGBLOB, "
- + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
- }
-
- @Override
- public boolean existsImpl(URI specUri) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(EXISTS_STATEMENT, this.tableName))) {
- statement.setString(1, specUri.toString());
- try (ResultSet rs = statement.executeQuery()) {
- rs.next();
- return rs.getBoolean(1);
- }
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void addSpecImpl(Spec spec) throws IOException {
- this.addSpec(spec, DEFAULT_TAG_VALUE);
- }
-
- /**
- * Temporarily only used for testing since tag it not exposed in endpoint of {@link org.apache.gobblin.runtime.api.FlowSpec}
- */
- public void addSpec(Spec spec, String tagValue) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
- setAddPreparedStatement(statement, spec, tagValue);
- statement.executeUpdate();
- connection.commit();
- } catch (SQLException | SpecSerDeException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public boolean deleteSpec(Spec spec) throws IOException {
- return deleteSpec(spec.getUri());
- }
-
- @Override
- public boolean deleteSpecImpl(URI specUri) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, this.tableName))) {
- statement.setString(1, specUri.toString());
- int result = statement.executeUpdate();
- connection.commit();
- return result != 0;
- } catch (SQLException e) {
- throw new IOException(e);
- }
}
@Override
- public boolean deleteSpec(URI specUri, String version) throws IOException {
- return deleteSpec(specUri);
+ protected String getConfigPrefix() {
+ return MysqlSpecStore.CONFIG_PREFIX;
}
@Override
- // TODO : this method is not doing what the contract is in the SpecStore interface
- public Spec updateSpecImpl(Spec spec) throws IOException {
- addSpec(spec);
- return spec;
+ protected SqlStatements createSqlStatements() {
+ return new SpecificSqlStatements();
}
+ /** Support search, unlike base class (presumably via a {@link org.apache.gobblin.runtime.api.FlowSpecSearchObject}). */
@Override
- public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
- Iterator<Spec> specsIterator = getSpecsImpl(FlowSpecSearchObject.builder().flowSpecUri(specUri).build()).iterator();
- if (specsIterator.hasNext()) {
- return specsIterator.next();
- } else {
- throw new SpecNotFoundException(specUri);
- }
- }
-
public Collection<Spec> getSpecsImpl(SpecSearchObject specSearchObject) throws IOException {
- FlowSpecSearchObject flowSpecSearchObject = (FlowSpecSearchObject) specSearchObject;
-
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(createGetPreparedStatement(flowSpecSearchObject, this.tableName))) {
- setGetPreparedStatement(statement, flowSpecSearchObject);
- return getSpecsInternal(statement);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public Spec getSpec(URI specUri, String version) throws IOException, SpecNotFoundException {
- return getSpec(specUri);
- }
-
- @Override
- public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException, SpecNotFoundException {
- return Lists.newArrayList(getSpec(specUri));
- }
-
- @Override
- public Collection<Spec> getSpecsImpl() throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
- return getSpecsInternal(statement);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- private Collection<Spec> getSpecsInternal(PreparedStatement statement) throws IOException {
- List<Spec> specs = new ArrayList<>();
- try (ResultSet rs = statement.executeQuery()) {
- while (rs.next()) {
- specs.add(
- rs.getString(3) == null
- ? this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()))
- : this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8))
- );
- }
- } catch (SQLException | SpecSerDeException e) {
- log.error("Failed to deserialize spec", e);
- throw new IOException(e);
- }
- return specs;
- }
-
- @Override
- public Iterator<URI> getSpecURIsImpl() throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_URIS_STATEMENT, this.tableName))) {
- return getURIIteratorByQuery(statement);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public int getSizeImpl() throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(GET_SIZE_STATEMENT, this.tableName));
- ResultSet resultSet = statement.executeQuery()) {
- resultSet.next();
- return resultSet.getInt(1);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT_WITH_TAG, this.tableName))) {
- statement.setString(1, tag);
- return getURIIteratorByQuery(statement);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- private Iterator<URI> getURIIteratorByQuery(PreparedStatement statement) throws SQLException {
- List<URI> specs = new ArrayList<>();
-
- try (ResultSet rs = statement.executeQuery()) {
- while (rs.next()) {
- URI specURI = URI.create(rs.getString(1));
- specs.add(specURI);
- }
- }
-
- return specs.iterator();
- }
-
- @Override
- public Optional<URI> getSpecStoreURI() {
- return Optional.of(this.specStoreURI);
- }
-
- /** This expects at least one parameter in {@link FlowSpecSearchObject} to be not null */
- static String createGetPreparedStatement(FlowSpecSearchObject flowSpecSearchObject, String tableName)
- throws IOException {
- String baseStatement = String.format(GET_STATEMENT, tableName);
- List<String> conditions = new ArrayList<>();
-
- if (flowSpecSearchObject.getFlowSpecUri() != null) {
- conditions.add("spec_uri = ?");
- }
-
- if (flowSpecSearchObject.getFlowGroup() != null) {
- conditions.add("flow_group = ?");
- }
-
- if (flowSpecSearchObject.getFlowName() != null) {
- conditions.add("flow_name = ?");
- }
-
- if (flowSpecSearchObject.getTemplateURI() != null) {
- conditions.add("template_uri = ?");
- }
-
- if (flowSpecSearchObject.getUserToProxy() != null) {
- conditions.add("user_to_proxy = ?");
- }
-
- if (flowSpecSearchObject.getSourceIdentifier() != null) {
- conditions.add("source_identifier = ?");
- }
-
- if (flowSpecSearchObject.getDestinationIdentifier() != null) {
- conditions.add("destination_identifier = ?");
- }
-
- if (flowSpecSearchObject.getSchedule() != null) {
- conditions.add("schedule = ?");
- }
-
- if (flowSpecSearchObject.getModifiedTimestamp() != null) {
- conditions.add("modified_time = ?");
- }
-
- if (flowSpecSearchObject.getIsRunImmediately() != null) {
- conditions.add("isRunImmediately = ?");
- }
-
- if (flowSpecSearchObject.getOwningGroup() != null) {
- conditions.add("owning_group = ?");
- }
-
- // If the propertyFilter is myKey=myValue, it looks for a config where key is `myKey` and value contains string `myValue`.
- // If the propertyFilter string does not have `=`, it considers the string as a key and just looks for its existence.
- // Multiple occurrences of `=` in propertyFilter are not supported and ignored completely.
- if (flowSpecSearchObject.getPropertyFilter() != null) {
- String propertyFilter = flowSpecSearchObject.getPropertyFilter();
- Splitter commaSplitter = Splitter.on(",").trimResults().omitEmptyStrings();
- for (String property : commaSplitter.splitToList(propertyFilter)) {
- if (property.contains("=")) {
- String[] keyValue = property.split("=");
- if (keyValue.length != 2) {
- log.error("Incorrect flow config search query");
- continue;
- }
- conditions.add("spec_json->'$.configAsProperties.\"" + keyValue[0] + "\"' like " + "'%" + keyValue[1] + "%'");
- } else {
- conditions.add("spec_json->'$.configAsProperties.\"" + property + "\"' is not null");
- }
- }
- }
-
- if (conditions.size() == 0) {
- throw new IOException("At least one condition is required to query flow configs.");
- }
-
- return baseStatement + String.join(" AND ", conditions);
- }
-
- private static void setGetPreparedStatement(PreparedStatement statement, FlowSpecSearchObject flowSpecSearchObject)
- throws SQLException {
- int i = 0;
-
- if (flowSpecSearchObject.getFlowSpecUri() != null) {
- statement.setString(++i, flowSpecSearchObject.getFlowSpecUri().toString());
- }
-
- if (flowSpecSearchObject.getFlowGroup() != null) {
- statement.setString(++i, flowSpecSearchObject.getFlowGroup());
- }
-
- if (flowSpecSearchObject.getFlowName() != null) {
- statement.setString(++i, flowSpecSearchObject.getFlowName());
- }
-
- if (flowSpecSearchObject.getTemplateURI() != null) {
- statement.setString(++i, flowSpecSearchObject.getTemplateURI());
- }
-
- if (flowSpecSearchObject.getUserToProxy() != null) {
- statement.setString(++i, flowSpecSearchObject.getUserToProxy());
- }
-
- if (flowSpecSearchObject.getSourceIdentifier() != null) {
- statement.setString(++i, flowSpecSearchObject.getSourceIdentifier());
- }
-
- if (flowSpecSearchObject.getDestinationIdentifier() != null) {
- statement.setString(++i, flowSpecSearchObject.getDestinationIdentifier());
- }
-
- if (flowSpecSearchObject.getSchedule() != null) {
- statement.setString(++i, flowSpecSearchObject.getModifiedTimestamp());
- }
-
- if (flowSpecSearchObject.getIsRunImmediately() != null) {
- statement.setBoolean(++i, flowSpecSearchObject.getIsRunImmediately());
- }
-
- if (flowSpecSearchObject.getOwningGroup() != null) {
- statement.setString(++i, flowSpecSearchObject.getOwningGroup());
- }
- }
-
- protected void setAddPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
- FlowSpec flowSpec = (FlowSpec) spec;
- URI specUri = flowSpec.getUri();
- Config flowConfig = flowSpec.getConfig();
- String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
- String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- String templateURI = new Gson().toJson(flowSpec.getTemplateURIs());
- String userToProxy = ConfigUtils.getString(flowSpec.getConfig(), "user.to.proxy", null);
- String sourceIdentifier = flowConfig.getString(FLOW_SOURCE_IDENTIFIER_KEY);
- String destinationIdentifier = flowConfig.getString(FLOW_DESTINATION_IDENTIFIER_KEY);
- String schedule = ConfigUtils.getString(flowConfig, ConfigurationKeys.JOB_SCHEDULE_KEY, null);
- String owningGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_OWNING_GROUP_KEY, null);
- boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
-
- int i = 0;
- statement.setString(++i, specUri.toString());
- statement.setString(++i, flowGroup);
- statement.setString(++i, flowName);
- statement.setString(++i, templateURI);
- statement.setString(++i, userToProxy);
- statement.setString(++i, sourceIdentifier);
- statement.setString(++i, destinationIdentifier);
- statement.setString(++i, schedule);
- statement.setString(++i, tagValue);
- statement.setBoolean(++i, isRunImmediately);
- statement.setString(++i, owningGroup);
- statement.setBlob(++i, new ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
- statement.setString(++i, new String(this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
+ return withPreparedStatement(specSearchObject.augmentBaseGetStatement(this.sqlStatements.getStatementBase), statement -> {
+ specSearchObject.completePreparedStatement(statement);
+ return retrieveSpecs(statement);
+ });
}
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java
new file mode 100644
index 0000000..1a68be2
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class MysqlBaseSpecStoreTest {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "base_spec_store";
+
+ private MysqlBaseSpecStore specStore;
+ private final URI uri1 = new URI(new TopologySpec.Builder().getDefaultTopologyCatalogURI().toString() + "1");
+ private final URI uri2 = new URI(new TopologySpec.Builder().getDefaultTopologyCatalogURI().toString() + "2");
+ private TopologySpec topoSpec1, topoSpec2;
+
+ public MysqlBaseSpecStoreTest()
+ throws URISyntaxException { // (based on `uri1` and other initializations just above)
+ }
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ // prefix keys to demonstrate disambiguation mechanism used to side-step intentially-sabatoged non-prefixed, 'fallback'
+ Config config = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, " SABATOGE! !" + testDb.getJdbcUrl())
+ .addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.specStore = new MysqlBaseSpecStore(config, new JavaSpecSerDe());
+
+ topoSpec1 = new TopologySpec.Builder(this.uri1)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive("key", "value")
+ .addPrimitive("key3", "value3")
+ .addPrimitive("config.with.dot", "value4").build())
+ .withDescription("Test1")
+ .withVersion("Test version")
+ .withSpecExecutor(MockedSpecExecutor.createDummySpecExecutor(new URI("execA")))
+ .build();
+ topoSpec2 = new TopologySpec.Builder(this.uri2)
+ .withConfig(ConfigBuilder.create().addPrimitive("converter", "value1,value2,value3")
+ .addPrimitive("key3", "value3").build())
+ .withDescription("Test2")
+ .withVersion("Test version 2")
+ .withSpecExecutor(MockedSpecExecutor.createDummySpecExecutor(new URI("execB")))
+ .build();
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void testSpecSearchUnsupported() throws Exception {
+ FlowSpecSearchObject flowSpecSearchObject = FlowSpecSearchObject.builder().build();
+ Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+ }
+
+ @Test
+ public void testAddSpec() throws Exception {
+ this.specStore.addSpec(this.topoSpec1);
+ this.specStore.addSpec(this.topoSpec2);
+
+ Assert.assertEquals(this.specStore.getSize(), 2);
+ Assert.assertTrue(this.specStore.exists(this.uri1));
+ Assert.assertTrue(this.specStore.exists(this.uri2));
+ Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
+ }
+
+ @Test (dependsOnMethods = "testAddSpec")
+ public void testGetSpec() throws Exception {
+ TopologySpec result = (TopologySpec) this.specStore.getSpec(this.uri1);
+ Assert.assertEquals(result, this.topoSpec1);
+
+ Collection<Spec> specs = this.specStore.getSpecs();
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.topoSpec1));
+ Assert.assertTrue(specs.contains(this.topoSpec1));
+
+ Iterator<URI> uris = this.specStore.getSpecURIs();
+ Assert.assertTrue(Iterators.contains(uris, this.uri1));
+ Assert.assertTrue(Iterators.contains(uris, this.uri2));
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetSpecWithTag() throws Exception {
+ //Creating and inserting specs with tags
+ URI uri5 = URI.create("topospec5");
+ TopologySpec topoSpec5 = new TopologySpec.Builder(uri5)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive("key5", "value5").build())
+ .withDescription("Test5")
+ .withVersion("Test version 5")
+ .withSpecExecutor(MockedSpecExecutor.createDummySpecExecutor(new URI("execE")))
+ .build();
+
+ URI uri6 = URI.create("topospec6");
+ TopologySpec topoSpec6 = new TopologySpec.Builder(uri6)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive("key6", "value6").build())
+ .withDescription("Test6")
+ .withVersion("Test version 6")
+ .withSpecExecutor(MockedSpecExecutor.createDummySpecExecutor(new URI("execF")))
+ .build();
+
+ this.specStore.addSpec(topoSpec5, "dr");
+ this.specStore.addSpec(topoSpec6, "dr");
+
+ Assert.assertTrue(this.specStore.exists(uri5));
+ Assert.assertTrue(this.specStore.exists(uri6));
+ List<URI> result = new ArrayList<>();
+ this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
+ Assert.assertEquals(result.size(), 2);
+ }
+
+ @Test (dependsOnMethods = "testGetSpecWithTag")
+ public void testDeleteSpec() throws Exception {
+ Assert.assertEquals(this.specStore.getSize(), 4);
+ this.specStore.deleteSpec(this.uri1);
+ Assert.assertEquals(this.specStore.getSize(), 3);
+ Assert.assertFalse(this.specStore.exists(this.uri1));
+ }
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index b382e76..b7e9df5 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -66,7 +66,7 @@ public class MysqlSpecStoreTest {
private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4;
public MysqlSpecStoreTest()
- throws URISyntaxException {
+ throws URISyntaxException { // (based on `uri1` and other initializations just above)
}
@BeforeClass
@@ -131,7 +131,7 @@ public class MysqlSpecStoreTest {
public void testSpecSearch() throws Exception {
// empty FlowSpecSearchObject should throw an error
FlowSpecSearchObject flowSpecSearchObject = FlowSpecSearchObject.builder().build();
- MysqlSpecStore.createGetPreparedStatement(flowSpecSearchObject, "table");
+ flowSpecSearchObject.augmentBaseGetStatement("SELECT * FROM Dummy WHERE ");
}
@Test
@@ -274,8 +274,8 @@ public class MysqlSpecStoreTest {
@Override
public void addSpec(Spec spec, String tagValue) throws IOException {
try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
- setAddPreparedStatement(statement, spec, tagValue);
+ PreparedStatement statement = connection.prepareStatement(this.sqlStatements.insertStatement)) {
+ this.sqlStatements.completeInsertPreparedStatement(statement, spec, tagValue);
statement.setString(4, null);
statement.executeUpdate();
connection.commit();