You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/01 21:44:28 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1160] No spec
delete on gobblin service start
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6bc395c [GOBBLIN-1160] No spec delete on gobblin service start
6bc395c is described below
commit 6bc395c3c1428852a30b5add8db7402be7b4b1ae
Author: Arjun <ab...@linkedin.com>
AuthorDate: Mon Jun 1 14:44:20 2020 -0700
[GOBBLIN-1160] No spec delete on gobblin service start
Closes #3011 from arjun4084346/noSpecRemoveOnStart
---
.../org/apache/gobblin/runtime/api/FlowSpec.java | 7 +-
.../runtime/job_monitor/KafkaJobMonitor.java | 11 +-
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 66 +++++---
.../modules/core/GobblinServiceManager.java | 2 +-
.../service/modules/flow/MockedSpecCompiler.java | 11 +-
.../scheduler/GobblinServiceJobScheduler.java | 133 ++++++++++------
.../gobblin/service/GobblinServiceManagerTest.java | 175 +++++++++++++++++----
7 files changed, 293 insertions(+), 112 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index fb27380..bec0445 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -362,6 +362,10 @@ public class FlowSpec implements Configurable, Spec {
return ConfigUtils.getBoolean(getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
}
+ public boolean isScheduled() {
+ return getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY);
+ }
+
@Slf4j
public static class Utils {
private final static String URI_SCHEME = "gobblin-flow";
@@ -371,7 +375,8 @@ public class FlowSpec implements Configurable, Spec {
private final static String URI_FRAGMENT = null;
private final static int EXPECTED_NUM_URI_PATH_TOKENS = 3;
- public static URI createFlowSpecUri(FlowId flowId) throws URISyntaxException {
+ public static URI createFlowSpecUri(FlowId flowId)
+ throws URISyntaxException {
return new URI(URI_SCHEME, URI_AUTHORITY, createUriPath(flowId), URI_QUERY, URI_FRAGMENT);
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 0f47625..ba9f06f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -24,12 +24,14 @@ import java.util.Collection;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
import com.typesafe.config.Config;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
@@ -38,9 +40,6 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
/**
* Abstract {@link JobSpecMonitor} that reads {@link JobSpec}s from a Kafka stream. Subclasses should implement
@@ -124,7 +123,7 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
* It fetches the job name from the given jobSpecUri
* and deletes its corresponding state store
* @param jobSpecUri jobSpecUri as created by
- * {@link FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri}
+ * {@link FlowSpec.Utils#createFlowSpecUri}
* @throws IOException
*/
private void deleteStateStore(URI jobSpecUri) throws IOException {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 029005c..b64ebd7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,12 +17,6 @@
package org.apache.gobblin.runtime.spec_catalog;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -33,8 +27,19 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import javax.annotation.Nonnull;
+
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import javax.annotation.Nonnull;
+import lombok.Getter;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
@@ -55,8 +60,6 @@ import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.callbacks.CallbackResult;
import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -78,7 +81,11 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
protected final Logger log;
protected final MetricContext metricContext;
protected final MutableStandardMetrics metrics;
+ @Getter
protected final SpecStore specStore;
+ // a map which keeps a handle of condition variables for each spec being added to the flow catalog
+ // to provide synchronization needed for flow specs
+ private final Map<String, Object> specSyncObjects = new HashMap<>();
private final ClassAliasResolver<SpecStore> aliasResolver;
@@ -280,7 +287,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
try {
spec = getSpec(uri);
} catch (SpecNotFoundException snfe) {
- log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatlog"
+ log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog"
+ ", suspecting current modification on SpecStore", uri), snfe);
}
return spec;
@@ -289,6 +296,12 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
/**
* Persist {@link Spec} into {@link SpecStore} and notify {@link SpecCatalogListener} if triggerListener
* is set to true.
+ * If the {@link Spec} is a {@link FlowSpec} it is persisted if it can be compiled at the time this method received
+ * the spec. `explain` specs are not persisted. The logic of this method is tightly coupled with the logic of
+ * {@link GobblinServiceJobScheduler#onAddSpec()}, which is one of the listener of {@link FlowCatalog}.
+ * We use condition variables {@link #specSyncObjects} to achieve synchronization between
+ * {@link GobblinServiceJobScheduler#NonScheduledJobRunner} thread and this thread to ensure deletion of
+ * {@link FlowSpec} happens after the corresponding run once flow is submitted to the orchestrator.
*
* @param spec The Spec to be added
* @param triggerListener True if listeners should be notified.
@@ -301,13 +314,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
Preconditions.checkNotNull(flowSpec);
log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", flowSpec.getUri(), flowSpec.getConfigAsProperties()));
- try {
- long startTime = System.currentTimeMillis();
- specStore.addSpec(flowSpec);
- metrics.updatePutSpecTime(startTime);
- } catch (IOException e) {
- throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
- }
+
+ Object syncObject = new Object();
+ specSyncObjects.put(flowSpec.getUri().toString(), syncObject);
if (triggerListener) {
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
@@ -317,7 +326,21 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
if (isCompileSuccessful(responseMap)) {
- responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+ synchronized (syncObject) {
+ try {
+ if (!flowSpec.isExplain()) {
+ long startTime = System.currentTimeMillis();
+ specStore.addSpec(spec);
+ metrics.updatePutSpecTime(startTime);
+ }
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
+ } finally {
+ syncObject.notifyAll();
+ this.specSyncObjects.remove(flowSpec.getUri().toString());
+ }
+ }
} else {
responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
}
@@ -326,7 +349,8 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
- AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
+ AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
+ ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
return isCompileSuccessful(addSpecResponse.getValue());
}
@@ -364,4 +388,8 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
}
}
+
+ public Object getSyncObject(String specUri) {
+ return this.specSyncObjects.getOrDefault(specUri, null);
+ }
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index fefbfa2..cb9e161 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -310,7 +310,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
}
});
this.restliServer = EmbeddedRestliServer.builder()
- .resources(Lists.<Class<? extends BaseResource>>newArrayList(FlowConfigsResource.class))
+ .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
.injector(injector)
.build();
if (config.hasPath(ServiceConfigKeys.SERVICE_PORT)) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
index 7a9e5db..09e3504 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
@@ -36,12 +36,14 @@ import org.apache.gobblin.util.ConfigUtils;
/**
- * This mocked SpecCompiler class creates 3 dummy job specs to emulate multi hop flow spec compiler.
+ * This mocked SpecCompiler class creates 3 dummy job specs to emulate flow spec compiler.
+ * It can also be used to compile in a certain way or not to compile at all to write negative test cases.
* It uses {@link InMemorySpecExecutor} for these dummy specs.
*/
public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
private static final int NUMBER_OF_JOBS = 3;
+ public static final String UNCOMPILABLE_FLOW = "uncompilableFlow";
public MockedSpecCompiler(Config config) {
super(config);
@@ -49,6 +51,11 @@ public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
@Override
public Dag<JobExecutionPlan> compileFlow(Spec spec) {
+ String flowName = (String) ((FlowSpec) spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY);
+ if (flowName.equalsIgnoreCase(UNCOMPILABLE_FLOW)) {
+ return null;
+ }
+
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
long flowExecutionId = System.currentTimeMillis();
@@ -57,7 +64,7 @@ public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
while(i++ < NUMBER_OF_JOBS) {
String specUri = "/foo/bar/spec/" + i;
Properties properties = new Properties();
- properties.put(ConfigurationKeys.FLOW_NAME_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY));
+ properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.put(ConfigurationKeys.FLOW_GROUP_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_GROUP_KEY));
properties.put(ConfigurationKeys.JOB_NAME_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY) + "_" + i);
properties.put(ConfigurationKeys.JOB_GROUP_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_GROUP_KEY) + "_" + i);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 4f2c9ff..5c70807 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -254,7 +254,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
}
- /** {@inheritDoc} */
+ /**
+ *
+ * @param addedSpec spec to be added
+ * @return add spec response
+ */
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) {
@@ -266,63 +270,55 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
_log.info("New Flow Spec detected: " + addedSpec);
- if (addedSpec instanceof FlowSpec) {
- try {
- FlowSpec flowSpec = (FlowSpec) addedSpec;
- URI flowSpecUri = flowSpec.getUri();
- Properties jobConfig = new Properties();
- Properties flowSpecProperties = ((FlowSpec) addedSpec).getConfigAsProperties();
- jobConfig.putAll(this.properties);
- jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, addedSpec.getUri().toString());
- jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY,
- flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
- jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
- ConfigUtils.getString((flowSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
- if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils.isNotBlank(
- flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
- jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
- flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
- }
- boolean isExplain = ConfigUtils.getBoolean(flowSpec.getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
- String response = null;
-
- // always try to compile the flow to verify if it is compilable
- Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
- if (dag != null && !dag.isEmpty()) {
- response = dag.toString();
- } else if (!flowSpec.getCompilationErrors().isEmpty()) {
- response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
- }
+ if (!(addedSpec instanceof FlowSpec)) {
+ return null;
+ }
- boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
+ FlowSpec flowSpec = (FlowSpec) addedSpec;
+ URI flowSpecUri = flowSpec.getUri();
+ Properties jobConfig = createJobConfig(flowSpec);
+ boolean isExplain = flowSpec.isExplain();
+ String response = null;
+
+ // always try to compile the flow to verify if it is compilable
+ Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ } else if (!flowSpec.getCompilationErrors().isEmpty()) {
+ response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
+ }
- if (!isExplain && compileSuccess) {
- this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+ boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
- if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
- _log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
- scheduleJob(jobConfig, null);
- if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
- _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
- this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
- }
- } else {
- _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
- this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
- }
- } else {
- _log.info("Removing the flow spec: {}, isExplain: {}, compileSuccess: {}", addedSpec, isExplain, compileSuccess);
- if (this.flowCatalog.isPresent()) {
- _log.debug("Removing flow spec from FlowCatalog: {}", flowSpec);
- GobblinServiceJobScheduler.this.flowCatalog.get().remove(flowSpecUri, new Properties(), false);
- }
- }
- return new AddSpecResponse<>(response);
+ if (isExplain || !compileSuccess) {
+ // todo: in case of a scheudled job, we should also check if the job schedule is a valid cron schedule
+ // so it can be scheduled
+ _log.info("Ignoring the spec {}. isExplain: {}, compileSuccess: {}", addedSpec, isExplain, compileSuccess);
+ return new AddSpecResponse<>(response);
+ }
+
+ // todo : we should probably not schedule a flow if it is a runOnce flow
+ this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+
+ if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ _log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
+ try {
+ scheduleJob(jobConfig, null);
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
+ this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
+ return null;
+ }
+ if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
+ this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
}
+ } else {
+ _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
+ this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
}
- return null;
+
+ return new AddSpecResponse<>(response);
}
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
@@ -348,7 +344,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
unscheduleJob(deletedSpecURI.toString());
} else {
_log.warn(String.format(
- "Spec with URI: %s was not found in cache. May be it was cleaned, if not please " + "clean it manually",
+ "Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually",
deletedSpecURI));
}
} catch (JobException | IOException e) {
@@ -384,6 +380,27 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
}
+ private Properties createJobConfig(FlowSpec flowSpec) {
+ Properties jobConfig = new Properties();
+ Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+
+ jobConfig.putAll(this.properties);
+ jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getUri().toString());
+ jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY,
+ flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
+ jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+ ConfigUtils.getString((flowSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
+
+ // todo : we should check if the job schedule is a valid cron schedule
+ if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils.isNotBlank(
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+ jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ }
+
+ return jobConfig;
+ }
+
/**
* A Gobblin job to be scheduled.
*/
@@ -435,10 +452,22 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
try {
GobblinServiceJobScheduler.this.runJob(this.jobConfig, this.jobListener);
if (flowCatalog.isPresent() && removeSpec) {
+ Object syncObject = GobblinServiceJobScheduler.this.flowCatalog.get().getSyncObject(specUri.toString());
+ if (syncObject != null) {
+ // if the sync object does not exist, this job must be set to run due to job submission at service restart
+ synchronized (syncObject) {
+ while (!GobblinServiceJobScheduler.this.flowCatalog.get().exists(specUri)) {
+ syncObject.wait();
+ }
+ }
+ }
GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, new Properties(), false);
+ GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
}
} catch (JobException je) {
_log.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+ } catch (InterruptedException e) {
+ _log.error("Failed to delete the spec " + specUri, e);
}
}
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 282ede6..d1e793f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service;
import java.io.File;
import java.net.URI;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -59,7 +60,9 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.core.GitConfigMonitor;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
@@ -80,6 +83,10 @@ public class GobblinServiceManagerTest {
private static final String TEST_GROUP_NAME = "testGroup";
private static final String TEST_FLOW_NAME = "testFlow";
+ private static final FlowId TEST_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+ private static final FlowId UNCOMPILABLE_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME)
+ .setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
+
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
private static final String TEST_DUMMY_GROUP_NAME = "dummyGroup";
@@ -88,11 +95,18 @@ public class GobblinServiceManagerTest {
private static final String TEST_SOURCE_NAME = "testSource";
private static final String TEST_SINK_NAME = "testSink";
+ private final URI TEST_URI = FlowSpec.Utils.createFlowSpecUri(TEST_FLOW_ID);
+
private MockGobblinServiceManager gobblinServiceManager;
- private FlowConfigClient flowConfigClient;
+ private FlowConfigV2Client flowConfigClient;
private Git gitForPush;
private TestingServer testingServer;
+ Properties serviceCoreProperties = new Properties();
+ Map<String, String> flowProperties = Maps.newHashMap();
+
+ public GobblinServiceManagerTest() throws Exception {
+ }
@BeforeClass
public void setup() throws Exception {
@@ -101,7 +115,11 @@ public class GobblinServiceManagerTest {
ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
testingServer = new TestingServer(true);
- Properties serviceCoreProperties = new Properties();
+
+ flowProperties.put("param1", "value1");
+ flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
+ flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
+
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
@@ -131,6 +149,8 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
+ serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, MockedSpecCompiler.class.getCanonicalName());
+
// Create a bare repository
RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
fileKey.open(false).create(true);
@@ -145,8 +165,8 @@ public class GobblinServiceManagerTest {
ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
this.gobblinServiceManager.start();
- this.flowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/",
- this.gobblinServiceManager.getRestLiServer().getPort()));
+ this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
+ this.gobblinServiceManager.getRestLiServer().getListeningURI().getPort()));
}
private void cleanUpDir(String dir) throws Exception {
@@ -183,50 +203,135 @@ public class GobblinServiceManagerTest {
}
}
+ /**
+ * To test an existing flow in a spec store does not get deleted just because it is not compilable during service restarts
+ */
@Test
- public void testCreate() throws Exception {
- Map<String, String> flowProperties = Maps.newHashMap();
- flowProperties.put("param1", "value1");
- flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
- flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
+ public void testRestart() throws Exception {
+ FlowConfig uncompilableFlowConfig = new FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI)
+ .setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
+ .setProperties(new StringMap(flowProperties));
+ FlowSpec uncompilableSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(uncompilableFlowConfig);
+ FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+ FlowSpec runOnceSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig);
+
+ // add the non compilable flow directly to the spec store skipping flow catalog which would not allow this
+ this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(uncompilableSpec);
+ this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec);
+
+ List<Spec> specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
+
+ Assert.assertEquals(specs.size(), 2);
+ if (specs.get(0).getUri().equals(uncompilableSpec.getUri())) {
+ Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri());
+ } else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) {
+ Assert.assertEquals(specs.get(1).getUri(), uncompilableSpec.getUri());
+ } else {
+ Assert.fail();
+ }
+
+ // restart the service
+ serviceReboot();
+
+ // runOnce job should get deleted from the spec store after running but uncompilable flow should stay
+ AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1)
+ .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1,
+ "Waiting for the runOnce job to finish");
+
+ specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
+ Assert.assertEquals(specs.get(0).getUri(), uncompilableSpec.getUri());
+ Assert.assertTrue(uncompilableSpec.getConfig().getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY));
+
+ // clean it
+ this.gobblinServiceManager.getFlowCatalog().remove(uncompilableSpec.getUri());
+ specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
+ Assert.assertEquals(specs.size(), 0);
+ }
+
+ @Test (dependsOnMethods = "testRestart")
+ public void testUncompilableJob() throws Exception {
+ FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
+ URI uri = FlowSpec.Utils.createFlowSpecUri(flowId);
+ FlowConfig flowConfig = new FlowConfig().setId(flowId)
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+
+ RestLiResponseException exception = null;
+ try {
+ this.flowConfigClient.createFlowConfig(flowConfig);
+ } catch (RestLiResponseException e) {
+ exception = e;
+ }
+ Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400);
+ // uncompilable job should not be persisted
+ Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
+ Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
+ }
+
+ @Test (dependsOnMethods = "testUncompilableJob")
+ public void testRunOnceJob() throws Exception {
+ FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+
+ this.flowConfigClient.createFlowConfig(flowConfig);
+
+ // runOnce job is deleted soon after it is orchestrated
+ AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1)
+ .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 0,
+ "Waiting for job to get orchestrated...");
+ AssertWithBackoff.create().maxSleepMs(100L).timeoutMs(1000L).backoffFactor(1)
+ .assertTrue(input -> !this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()),
+ "Waiting for job to get orchestrated...");
+ }
+ @Test (dependsOnMethods = "testRunOnceJob")
+ public void testExplainJob() throws Exception {
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)).setExplain(true);
+
+ this.flowConfigClient.createFlowConfig(flowConfig);
+
+ // explain job should not be persisted
+ Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
+ Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
+ }
+
+ @Test (dependsOnMethods = "testExplainJob")
+ public void testCreate() throws Exception {
+ FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
.setProperties(new StringMap(flowProperties));
this.flowConfigClient.createFlowConfig(flowConfig);
- Assert.assertTrue(this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1, "Flow that was created is not "
- + "reflecting in FlowCatalog");
+ Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 1);
+ Assert.assertTrue(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
}
@Test (dependsOnMethods = "testCreate")
public void testCreateAgain() throws Exception {
- Map<String, String> flowProperties = Maps.newHashMap();
- flowProperties.put("param1", "value1");
- flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
- flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
-
- FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
+ RestLiResponseException exception = null;
try {
this.flowConfigClient.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.fail("Create Again should pass without complaining that the spec already exists.");
+ exception = e;
}
+ Assert.assertNotNull(exception);
+ Assert.assertEquals(exception.getStatus(), HttpStatus.CONFLICT_409);
}
@Test (dependsOnMethods = "testCreateAgain")
public void testGet() throws Exception {
- FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
- FlowConfig flowConfig = this.flowConfigClient.getFlowConfig(flowId);
+ FlowConfig flowConfig = this.flowConfigClient.getFlowConfig(TEST_FLOW_ID);
Assert.assertEquals(flowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
- Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+ Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
// Add this assert back when getFlowSpec() is changed to return the raw flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 1);
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
@@ -250,6 +355,7 @@ public class GobblinServiceManagerTest {
FlowConfig retrievedFlowConfig = this.flowConfigClient.getFlowConfig(flowId);
+ Assert.assertTrue(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
Assert.assertEquals(retrievedFlowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
Assert.assertEquals(retrievedFlowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
@@ -263,6 +369,7 @@ public class GobblinServiceManagerTest {
@Test (dependsOnMethods = "testUpdate")
public void testDelete() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+ URI uri = FlowSpec.Utils.createFlowSpecUri(flowId);
// make sure flow config exists
FlowConfig flowConfig = this.flowConfigClient.getFlowConfig(flowId);
@@ -275,6 +382,7 @@ public class GobblinServiceManagerTest {
this.flowConfigClient.getFlowConfig(flowId);
} catch (RestLiResponseException e) {
Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
+ Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
return;
}
@@ -290,7 +398,7 @@ public class GobblinServiceManagerTest {
Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8);
Collection<Spec> specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
- Assert.assertTrue(specs.size() == 0);
+ Assert.assertEquals(specs.size(), 0);
// add, commit, push
this.gitForPush.add().addFilepattern("gobblin-config/testGroup/testFlow.pull").call();
@@ -300,14 +408,12 @@ public class GobblinServiceManagerTest {
// polling is every 5 seconds, so wait twice as long and check
TimeUnit.SECONDS.sleep(10);
- specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
- Assert.assertTrue(specs.size() == 1);
+ // spec generated using git monitor do not have schedule, so their life cycle should be similar to runOnce jobs
+ Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
- FlowSpec spec = (FlowSpec) (specs.iterator().next());
- Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
- Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
- Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
- Assert.assertEquals(spec.getConfig().getString("param1"), "value20");
+ AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1)
+ .assertTrue(input -> !this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()),
+ "Waiting for job to get orchestrated...");
}
@Test
@@ -354,7 +460,15 @@ public class GobblinServiceManagerTest {
} catch (RestLiResponseException e) {
Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
}
- cleanUpDir(FLOW_SPEC_STORE_DIR);
+ }
+
+ private void serviceReboot() throws Exception {
+ this.gobblinServiceManager.stop();
+ this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
+ ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
+ this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
+ this.gobblinServiceManager.getRestLiServer().getPort()));
+ this.gobblinServiceManager.start();
}
class MockGobblinServiceManager extends GobblinServiceManager {
@@ -368,6 +482,5 @@ public class GobblinServiceManagerTest {
protected EmbeddedRestliServer getRestLiServer() {
return this.restliServer;
}
-
}
}
\ No newline at end of file