You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/01 20:12:49 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1168] add
metrics in all SpecStore implementations
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3f84a01 [GOBBLIN-1168] add metrics in all SpecStore implementations
3f84a01 is described below
commit 3f84a013debd8abcb5af13eca51e8b803dc8e578
Author: Arjun <ab...@linkedin.com>
AuthorDate: Mon Jun 1 13:12:35 2020 -0700
[GOBBLIN-1168] add metrics in all SpecStore implementations
Closes #3001 from arjun4084346/flowSpecFields2
---
.../service/FlowConfigResourceLocalHandler.java | 49 +------
gobblin-runtime/build.gradle | 2 +
.../org/apache/gobblin/runtime/api/FlowSpec.java | 49 +++++++
.../gobblin/runtime/api/InstrumentedSpecStore.java | 162 +++++++++++++++++++++
.../gobblin/runtime/spec_store/FSSpecStore.java | 48 +++---
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 61 ++++++--
.../runtime/spec_store/MysqlSpecStoreTest.java | 85 ++++++++---
...ControllerUserDefinedMessageHandlerFactory.java | 2 +-
.../service/modules/orchestration/DagManager.java | 6 +-
.../modules/orchestration/DagManagerFlowTest.java | 8 +-
10 files changed, 357 insertions(+), 115 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 49e5a72..eaa62a5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -81,7 +81,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
log.info("[GAAS-REST] Get called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
try {
- URI flowUri = FlowUriUtils.createFlowSpecUri(flowId);
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
FlowSpec spec = (FlowSpec) flowCatalog.getSpec(flowUri);
FlowConfig flowConfig = new FlowConfig();
Properties flowProps = spec.getConfigAsProperties();
@@ -207,7 +207,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
URI flowUri = null;
try {
- flowUri = FlowUriUtils.createFlowSpecUri(flowId);
+ flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
this.flowCatalog.remove(flowUri, header, triggerListener);
return new UpdateResponse(HttpStatus.S_200_OK);
} catch (URISyntaxException e) {
@@ -282,49 +282,4 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getTemplateUris(), e);
}
}
-
- public static class FlowUriUtils {
- private final static String URI_SCHEME = "gobblin-flow";
- private final static String URI_AUTHORITY = null;
- private final static String URI_PATH_SEPARATOR = "/";
- private final static String URI_QUERY = null;
- private final static String URI_FRAGMENT = null;
- private final static int EXPECTED_NUM_URI_PATH_TOKENS = 3;
-
- public static URI createFlowSpecUri(FlowId flowId) throws URISyntaxException {
- return new URI(URI_SCHEME, URI_AUTHORITY, createUriPath(flowId), URI_QUERY, URI_FRAGMENT);
- }
-
- private static String createUriPath(FlowId flowId) {
- return URI_PATH_SEPARATOR + flowId.getFlowGroup() + URI_PATH_SEPARATOR + flowId.getFlowName();
- }
-
- /**
- * returns the flow name from the flowUri
- * @param flowUri FlowUri
- * @return null if the provided flowUri is not valid
- */
- public static String getFlowName(URI flowUri) {
- String[] uriTokens = flowUri.getPath().split("/");
- if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
- log.error("Invalid URI {}.", flowUri);
- return null;
- }
- return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 1];
- }
-
- /**
- * returns the flow group from the flowUri
- * @param flowUri FlowUri
- * @return null if the provided flowUri is not valid
- */
- public static String getFlowGroup(URI flowUri) {
- String[] uriTokens = flowUri.getPath().split("/");
- if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
- log.error("Invalid URI {}.", flowUri);
- return null;
- }
- return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 2];
- }
- }
}
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 306036c..8b0e4e9 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -44,6 +44,8 @@ dependencies {
compile project(":gobblin-utility")
compile project(path: ':gobblin-rest-service:gobblin-rest-api', configuration: 'restClient')
compile project(path: ':gobblin-rest-service:gobblin-rest-api', configuration: 'dataTemplate')
+ compile project(path: ':gobblin-restli:gobblin-flow-config-service:gobblin-flow-config-service-api', configuration: 'restClient')
+ compile project(path: ':gobblin-restli:gobblin-flow-config-service:gobblin-flow-config-service-api', configuration: 'dataTemplate')
compile project(":gobblin-rest-service:gobblin-rest-server")
compile project(":gobblin-modules:google-ingestion")
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 8721e41..fb27380 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -34,8 +34,11 @@ import java.util.Properties;
import java.util.Set;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.util.ConfigUtils;
@@ -358,4 +361,50 @@ public class FlowSpec implements Configurable, Spec {
public Boolean isExplain() {
return ConfigUtils.getBoolean(getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
}
+
+ @Slf4j
+ public static class Utils {
+ private final static String URI_SCHEME = "gobblin-flow";
+ private final static String URI_AUTHORITY = null;
+ private final static String URI_PATH_SEPARATOR = "/";
+ private final static String URI_QUERY = null;
+ private final static String URI_FRAGMENT = null;
+ private final static int EXPECTED_NUM_URI_PATH_TOKENS = 3;
+
+ public static URI createFlowSpecUri(FlowId flowId) throws URISyntaxException {
+ return new URI(URI_SCHEME, URI_AUTHORITY, createUriPath(flowId), URI_QUERY, URI_FRAGMENT);
+ }
+
+ private static String createUriPath(FlowId flowId) {
+ return URI_PATH_SEPARATOR + flowId.getFlowGroup() + URI_PATH_SEPARATOR + flowId.getFlowName();
+ }
+
+ /**
+ * returns the flow name from the flowUri
+ * @param flowUri FlowUri
+ * @return null if the provided flowUri is not valid
+ */
+ public static String getFlowName(URI flowUri) {
+ String[] uriTokens = flowUri.getPath().split("/");
+ if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
+ log.error("Invalid URI {}.", flowUri);
+ return null;
+ }
+ return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 1];
+ }
+
+ /**
+ * returns the flow group from the flowUri
+ * @param flowUri FlowUri
+ * @return null if the provided flowUri is not valid
+ */
+ public static String getFlowGroup(URI flowUri) {
+ String[] uriTokens = flowUri.getPath().split("/");
+ if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
+ log.error("Invalid URI {}.", flowUri);
+ return null;
+ }
+ return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 2];
+ }
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
new file mode 100644
index 0000000..442fdc9
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Instrumented version of {@link SpecStore} automatically capturing certain metrics.
+ * Subclasses should implement addSpecImpl instead of addSpec and so on.
+ */
+public abstract class InstrumentedSpecStore implements SpecStore {
+ private Optional<Timer> getTimer;
+ private Optional<Timer> existsTimer;
+ private Optional<Timer> deleteTimer;
+ private Optional<Timer> addTimer;
+ private Optional<Timer> updateTimer;
+ private Optional<Timer> getAllTimer;
+ private Optional<Timer> getURIsTimer;
+ private MetricContext metricContext;
+ private final boolean instrumentationEnabled;
+
+ public InstrumentedSpecStore(Config config, SpecSerDe specSerDe) {
+ this.instrumentationEnabled = GobblinMetrics.isEnabled(new State(ConfigUtils.configToProperties(config)));
+ this.metricContext = Instrumented.getMetricContext(new State(), getClass());
+ this.getTimer = createTimer("-GET");
+ this.existsTimer = createTimer("-EXISTS");
+ this.deleteTimer = createTimer("-DELETE");
+ this.addTimer = createTimer("-ADD");
+ this.updateTimer = createTimer("-UPDATE");
+ this.getAllTimer = createTimer("-GETALL");
+ this.getURIsTimer = createTimer("-GETURIS");
+ }
+
+ private Optional<Timer> createTimer(String suffix) {
+ return instrumentationEnabled
+ ? Optional.of(this.metricContext.timer(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,getClass().getSimpleName(), suffix)))
+ : Optional.absent();
+ }
+
+ @Override
+ public boolean exists(URI specUri) throws IOException {
+ if (!instrumentationEnabled) {
+ return existsImpl(specUri);
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ boolean ret = existsImpl(specUri);
+ Instrumented.updateTimer(this.existsTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ return ret;
+ }
+ }
+
+ @Override
+ public void addSpec(Spec spec) throws IOException {
+ if (!instrumentationEnabled) {
+ addSpecImpl(spec);
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ addSpecImpl(spec);
+ Instrumented.updateTimer(this.addTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public boolean deleteSpec(URI specUri) throws IOException {
+ if (!instrumentationEnabled) {
+ return deleteSpecImpl(specUri);
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ boolean ret = deleteSpecImpl(specUri);
+ Instrumented.updateTimer(this.deleteTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ return ret;
+ }
+ }
+
+ @Override
+ public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
+ if (!instrumentationEnabled) {
+ return getSpecImpl(specUri);
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ Spec spec = getSpecImpl(specUri);
+ Instrumented.updateTimer(this.getTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ return spec;
+ }
+ }
+
+ @Override
+ public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
+ if (!instrumentationEnabled) {
+ return updateSpecImpl(spec);
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ Spec ret = updateSpecImpl(spec);
+ Instrumented.updateTimer(this.updateTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ return ret;
+ }
+ }
+
+ @Override
+ public Collection<Spec> getSpecs() throws IOException {
+ if (!instrumentationEnabled) {
+ return getSpecsImpl();
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ Collection<Spec> spec = getSpecsImpl();
+ Instrumented.updateTimer(this.getAllTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ return spec;
+ }
+ }
+
+ @Override
+ public Iterator<URI> getSpecURIs() throws IOException {
+ if (!instrumentationEnabled) {
+ return getSpecURIsImpl();
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ Iterator<URI> specURIs = getSpecURIsImpl();
+ Instrumented.updateTimer(this.getURIsTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ return specURIs;
+ }
+ }
+
+ public abstract void addSpecImpl(Spec spec) throws IOException;
+ public abstract Spec updateSpecImpl(Spec spec) throws IOException, SpecNotFoundException;
+ public abstract boolean existsImpl(URI specUri) throws IOException;
+ public abstract Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException;
+ public abstract boolean deleteSpecImpl(URI specUri) throws IOException;
+ public abstract Collection<Spec> getSpecsImpl() throws IOException;
+ public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index feee4a9..14a3069 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -17,23 +17,11 @@
package org.apache.gobblin.runtime.spec_store;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.SpecSerDe;
-import org.apache.gobblin.runtime.api.SpecStore;
-import org.apache.gobblin.util.PathUtils;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -45,6 +33,21 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.util.PathUtils;
+
/**
* The Spec Store for file system to persist the Spec information.
@@ -53,7 +56,7 @@ import org.slf4j.LoggerFactory;
* 2. This implementation does not performs implicit version management.
* For implicit version management, please use a wrapper FSSpecStore.
*/
-public class FSSpecStore implements SpecStore {
+public class FSSpecStore extends InstrumentedSpecStore {
/***
* Configuration properties related to Spec Store
@@ -83,6 +86,7 @@ public class FSSpecStore implements SpecStore {
public FSSpecStore(Config sysConfig, SpecSerDe specSerDe, Optional<Logger> log)
throws IOException {
+ super(sysConfig, specSerDe);
Preconditions.checkArgument(sysConfig.hasPath(SPECSTORE_FS_DIR_KEY),
"FS SpecStore path must be specified.");
@@ -141,7 +145,7 @@ public class FSSpecStore implements SpecStore {
}
@Override
- public boolean exists(URI specUri) throws IOException {
+ public boolean existsImpl(URI specUri) throws IOException {
Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, FlowSpec.Builder.DEFAULT_VERSION);
@@ -149,7 +153,7 @@ public class FSSpecStore implements SpecStore {
}
@Override
- public void addSpec(Spec spec) throws IOException {
+ public void addSpecImpl(Spec spec) throws IOException {
Preconditions.checkArgument(null != spec, "Spec should not be null");
log.info(String.format("Adding Spec with URI: %s in FSSpecStore: %s", spec.getUri(), this.fsSpecStoreDirPath));
@@ -165,10 +169,10 @@ public class FSSpecStore implements SpecStore {
}
@Override
- public boolean deleteSpec(URI specUri) throws IOException {
+ public boolean deleteSpecImpl(URI specUri) throws IOException {
Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
- return deleteSpec(specUri, FlowSpec.Builder.DEFAULT_VERSION);
+ return deleteSpec(specUri, FlowSpec.Builder.DEFAULT_VERSION);
}
@Override
@@ -186,13 +190,13 @@ public class FSSpecStore implements SpecStore {
}
@Override
- public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
+ public Spec updateSpecImpl(Spec spec) throws IOException, SpecNotFoundException {
addSpec(spec);
return spec;
}
@Override
- public Spec getSpec(URI specUri) throws SpecNotFoundException {
+ public Spec getSpecImpl(URI specUri) throws SpecNotFoundException {
Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
Collection<Spec> specs = getAllVersionsOfSpec(specUri);
@@ -228,7 +232,7 @@ public class FSSpecStore implements SpecStore {
}
@Override
- public Collection<Spec> getSpecs() throws IOException {
+ public Collection<Spec> getSpecsImpl() throws IOException {
Collection<Spec> specs = Lists.newArrayList();
try {
getSpecs(this.fsSpecStoreDirPath, specs);
@@ -240,7 +244,7 @@ public class FSSpecStore implements SpecStore {
}
@Override
- public Iterator<URI> getSpecURIs() throws IOException {
+ public Iterator<URI> getSpecURIsImpl() throws IOException {
final RemoteIterator<LocatedFileStatus> it = fs.listFiles(this.fsSpecStoreDirPath, true);
return new Iterator<URI>() {
@Override
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 53a2b7e..a5a0d2b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -33,6 +33,7 @@ import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
+import com.google.gson.Gson;
import com.typesafe.config.Config;
import javax.sql.DataSource;
@@ -41,6 +42,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
@@ -48,6 +51,9 @@ import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
+
/**
* Implementation of {@link SpecStore} that stores specs as serialized java objects in MySQL. Note that versions are not
@@ -59,7 +65,7 @@ import org.apache.gobblin.util.ConfigUtils;
* but not removing it from {@link SpecStore}.
*/
@Slf4j
-public class MysqlSpecStore implements SpecStore {
+public class MysqlSpecStore extends InstrumentedSpecStore {
public static final String CONFIG_PREFIX = "mysqlSpecStore";
public static final String DEFAULT_TAG_VALUE = "";
private static final String NEW_COLUMN = "spec_json";
@@ -72,8 +78,9 @@ public class MysqlSpecStore implements SpecStore {
+ "isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), "
+ "spec LONGBLOB, " + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
- protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec, " + NEW_COLUMN + ") "
- + "VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
+ protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
+ + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, spec, " + NEW_COLUMN + ") "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
private static final String GET_STATEMENT = "SELECT spec, " + NEW_COLUMN + " FROM %s WHERE spec_uri = ?";
private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
@@ -86,6 +93,7 @@ public class MysqlSpecStore implements SpecStore {
protected final SpecSerDe specSerDe;
public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
+ super(config, specSerDe);
if (config.hasPath(CONFIG_PREFIX)) {
config = config.getConfig(CONFIG_PREFIX).withFallback(config);
}
@@ -104,7 +112,7 @@ public class MysqlSpecStore implements SpecStore {
}
@Override
- public boolean exists(URI specUri) throws IOException {
+ public boolean existsImpl(URI specUri) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(EXISTS_STATEMENT, this.tableName))) {
statement.setString(1, specUri.toString());
@@ -118,7 +126,7 @@ public class MysqlSpecStore implements SpecStore {
}
@Override
- public void addSpec(Spec spec) throws IOException {
+ public void addSpecImpl(Spec spec) throws IOException {
this.addSpec(spec, DEFAULT_TAG_VALUE);
}
@@ -128,10 +136,7 @@ public class MysqlSpecStore implements SpecStore {
public void addSpec(Spec spec, String tagValue) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
- statement.setString(1, spec.getUri().toString());
- statement.setString(2, tagValue);
- statement.setBlob(3, new ByteArrayInputStream(this.specSerDe.serialize(spec)));
- statement.setString(4, new String(this.specSerDe.serialize(spec), Charsets.UTF_8));
+ setPreparedStatement(statement, spec, tagValue);
statement.executeUpdate();
connection.commit();
} catch (SQLException | SpecSerDeException e) {
@@ -145,7 +150,7 @@ public class MysqlSpecStore implements SpecStore {
}
@Override
- public boolean deleteSpec(URI specUri) throws IOException {
+ public boolean deleteSpecImpl(URI specUri) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, this.tableName))) {
statement.setString(1, specUri.toString());
@@ -164,13 +169,13 @@ public class MysqlSpecStore implements SpecStore {
@Override
// TODO : this method is not doing what the contract is in the SpecStore interface
- public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
+ public Spec updateSpecImpl(Spec spec) throws IOException {
addSpec(spec);
return spec;
}
@Override
- public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
+ public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(GET_STATEMENT, this.tableName))) {
statement.setString(1, specUri.toString());
@@ -199,7 +204,7 @@ public class MysqlSpecStore implements SpecStore {
}
@Override
- public Collection<Spec> getSpecs() throws IOException {
+ public Collection<Spec> getSpecsImpl() throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
List<Spec> specs = new ArrayList<>();
@@ -217,7 +222,6 @@ public class MysqlSpecStore implements SpecStore {
}
}
}
-
return specs;
} catch (SQLException e) {
throw new IOException(e);
@@ -225,7 +229,7 @@ public class MysqlSpecStore implements SpecStore {
}
@Override
- public Iterator<URI> getSpecURIs() throws IOException {
+ public Iterator<URI> getSpecURIsImpl() throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_URIS_STATEMENT, this.tableName))) {
return getURIIteratorByQuery(statement);
@@ -262,4 +266,31 @@ public class MysqlSpecStore implements SpecStore {
public Optional<URI> getSpecStoreURI() {
return Optional.of(this.specStoreURI);
}
+
+ protected void setPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ URI specUri = flowSpec.getUri();
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String templateURI = new Gson().toJson(flowSpec.getTemplateURIs());
+ String userToProxy = ConfigUtils.getString(flowSpec.getConfig(), "user.to.proxy", null);
+ String sourceIdentifier = flowConfig.getString(FLOW_SOURCE_IDENTIFIER_KEY);
+ String destinationIdentifier = flowConfig.getString(FLOW_DESTINATION_IDENTIFIER_KEY);
+ String schedule = ConfigUtils.getString(flowConfig, ConfigurationKeys.JOB_SCHEDULE_KEY, null);
+ boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
+
+ statement.setString(1, specUri.toString());
+ statement.setString(2, flowGroup);
+ statement.setString(3, flowName);
+ statement.setString(4, templateURI);
+ statement.setString(5, userToProxy);
+ statement.setString(6, sourceIdentifier);
+ statement.setString(7, destinationIdentifier);
+ statement.setString(8, schedule);
+ statement.setString(9, tagValue);
+ statement.setBoolean(10, isRunImmediately);
+ statement.setBlob(11, new ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
+ statement.setString(12, new String(this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
+ }
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 16e08b8..6762f3c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.runtime.spec_store;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -28,11 +29,14 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
+
import org.apache.commons.lang3.ArrayUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Charsets;
import com.google.common.collect.Iterators;
import com.typesafe.config.Config;
@@ -45,6 +49,10 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe;
+import org.apache.gobblin.service.FlowId;
+
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
public class MysqlSpecStoreTest {
@@ -54,24 +62,14 @@ public class MysqlSpecStoreTest {
private MysqlSpecStore specStore;
private MysqlSpecStore oldSpecStore;
- private URI uri1 = URI.create("flowspec1");
- private URI uri2 = URI.create("flowspec2");
- private URI uri3 = URI.create("flowspec3");
- private FlowSpec flowSpec1 = FlowSpec.builder(this.uri1)
- .withConfig(ConfigBuilder.create().addPrimitive("key", "value").build())
- .withDescription("Test flow spec")
- .withVersion("Test version")
- .build();
- private FlowSpec flowSpec2 = FlowSpec.builder(this.uri2)
- .withConfig(ConfigBuilder.create().addPrimitive("key2", "value2").build())
- .withDescription("Test flow spec 2")
- .withVersion("Test version 2")
- .build();
- private FlowSpec flowSpec3 = FlowSpec.builder(this.uri3)
- .withConfig(ConfigBuilder.create().addPrimitive("key3", "value3").build())
- .withDescription("Test flow spec 3")
- .withVersion("Test version 3")
- .build();
+ private URI uri1 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg1").setFlowGroup("fn1"));
+ private URI uri2 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg2").setFlowGroup("fn2"));
+ private URI uri3 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg3").setFlowGroup("fn3"));
+ private FlowSpec flowSpec1, flowSpec2, flowSpec3;
+
+ public MysqlSpecStoreTest()
+ throws URISyntaxException {
+ }
@BeforeClass
public void setUp() throws Exception {
@@ -86,6 +84,39 @@ public class MysqlSpecStoreTest {
this.specStore = new MysqlSpecStore(config, new TestSpecSerDe());
this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
+
+ Properties properties = new Properties();
+ properties.setProperty(FLOW_SOURCE_IDENTIFIER_KEY, "source");
+ properties.setProperty(FLOW_DESTINATION_IDENTIFIER_KEY, "destination");
+
+ flowSpec1 = FlowSpec.builder(this.uri1)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive("key", "value")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg1")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn1").build())
+ .withDescription("Test flow spec")
+ .withVersion("Test version")
+ .build();
+ flowSpec2 = FlowSpec.builder(this.uri2)
+ .withConfig(ConfigBuilder.create().addPrimitive("key2", "value2")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn2").build())
+ .withDescription("Test flow spec 2")
+ .withVersion("Test version 2")
+ .build();
+ flowSpec3 = FlowSpec.builder(this.uri3)
+ .withConfig(ConfigBuilder.create().addPrimitive("key3", "value3")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg3")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn3").build())
+ .withDescription("Test flow spec 3")
+ .withVersion("Test version 3")
+ .build();
}
@Test
@@ -118,14 +149,24 @@ public class MysqlSpecStoreTest {
//Creating and inserting flowspecs with tags
URI uri4 = URI.create("flowspec4");
FlowSpec flowSpec4 = FlowSpec.builder(uri4)
- .withConfig(ConfigBuilder.create().addPrimitive("key4", "value4").build())
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+ .addPrimitive("key4", "value4").build())
.withDescription("Test flow spec 4")
.withVersion("Test version 4")
.build();
URI uri5 = URI.create("flowspec5");
FlowSpec flowSpec5 = FlowSpec.builder(uri5)
- .withConfig(ConfigBuilder.create().addPrimitive("key5", "value5").build())
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5")
+ .addPrimitive("key5", "value5").build())
.withDescription("Test flow spec 5")
.withVersion("Test version 5")
.build();
@@ -173,9 +214,7 @@ public class MysqlSpecStoreTest {
public void addSpec(Spec spec, String tagValue) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
- statement.setString(1, spec.getUri().toString());
- statement.setString(2, tagValue);
- statement.setBlob(3, new ByteArrayInputStream(this.specSerDe.serialize(spec)));
+ setPreparedStatement(statement, spec, tagValue);
statement.setString(4, null);
statement.executeUpdate();
connection.commit();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
index 6181ff9..08a9cb5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
@@ -152,7 +152,7 @@ class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactor
FlowId id = FlowConfigUtils.deserializeFlowId(msg);
if (flowCatalogLocalCommit) {
// in balance mode, flow spec is already deleted in flow catalog on standby node.
- URI flowUri = FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(id);
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(id);
log.info("Only handle update {} scheduling because flow catalog is committed locally on standby.", flowUri);
jobScheduler.onDeleteSpec(flowUri, FlowSpec.Builder.DEFAULT_VERSION);
} else {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index e2a11b6..f323ee3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -62,12 +62,12 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -284,8 +284,8 @@ public class DagManager extends AbstractIdleService {
* The {@link DagManager} adds the dag to the {@link BlockingQueue} to be picked up by one of the {@link DagManagerThread}s.
*/
synchronized public void stopDag(URI uri) throws IOException {
- String flowGroup = FlowConfigResourceLocalHandler.FlowUriUtils.getFlowGroup(uri);
- String flowName = FlowConfigResourceLocalHandler.FlowUriUtils.getFlowName(uri);
+ String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+ String flowName = FlowSpec.Utils.getFlowName(uri);
List<Long> flowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
log.info("Found {} flows to cancel.", flowExecutionIds.size());
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index edc881f..85b8bca 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -37,9 +37,9 @@ import com.typesafe.config.ConfigValueFactory;
import javax.annotation.Nullable;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -103,9 +103,9 @@ public class DagManagerFlowTest {
assertTrue(input -> dagManager.dagManagerThreads[queue3].dagToJobs.containsKey(dagId3), ERROR_MESSAGE);
// mock delete spec
- dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group0").setFlowName("flow0")));
- dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group1").setFlowName("flow1")));
- dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new FlowId().setFlowGroup("group2").setFlowName("flow2")));
+ dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group0").setFlowName("flow0")));
+ dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group1").setFlowName("flow1")));
+ dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group2").setFlowName("flow2")));
// verify deleteSpec() of specProducer is called once
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag1), ERROR_MESSAGE);