You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/08 00:29:19 UTC
incubator-gobblin git commit: [GOBBLIN-538] Flow config v2 resource
Repository: incubator-gobblin
Updated Branches:
refs/heads/master c2d59a1cb -> 1896d7fab
[GOBBLIN-538] Flow config v2 resource
Closes #2431 from
arjun4084346/FlowConfigV2Resource
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1896d7fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1896d7fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1896d7fa
Branch: refs/heads/master
Commit: 1896d7fab5ec43ac248fdb5bebf0218bed156d00
Parents: c2d59a1
Author: Arjun <ab...@linkedin.com>
Authored: Fri Sep 7 17:29:14 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Sep 7 17:29:14 2018 -0700
----------------------------------------------------------------------
....gobblin.service.flowconfigsV2.restspec.json | 31 +++
.../gobblin/service/FlowConfigV2Client.java | 213 +++++++++++++++++++
.../gobblin/service/FlowConfigV2Test.java | 134 ++++++++++++
.../service/FlowConfigResourceLocalHandler.java | 10 +-
.../FlowConfigV2ResourceLocalHandler.java | 50 +++++
.../gobblin/service/FlowConfigsV2Resource.java | 122 +++++++++++
.../modules/core/GobblinServiceManager.java | 11 +
.../modules/flow/BaseFlowToJobSpecCompiler.java | 34 +--
.../gobblin/service/modules/flow/FlowUtils.java | 36 ++++
.../flow/MultiHopsFlowToJobSpecCompiler.java | 47 ++--
.../pathfinder/AbstractPathFinder.java | 4 +-
11 files changed, 649 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
new file mode 100644
index 0000000..e45718e
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -0,0 +1,31 @@
+{
+ "name" : "flowconfigsV2",
+ "namespace" : "org.apache.gobblin.service",
+ "path" : "/flowconfigsV2",
+ "schema" : "org.apache.gobblin.service.FlowConfig",
+ "doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsV2Resource",
+ "collection" : {
+ "identifier" : {
+ "name" : "id",
+ "type" : "org.apache.gobblin.service.FlowId",
+ "params" : "org.apache.gobblin.service.FlowStatusId"
+ },
+ "supports" : [ "create", "delete", "get", "update" ],
+ "methods" : [ {
+ "method" : "create",
+ "doc" : "Create a flow configuration that the service will forward to execution instances for execution"
+ }, {
+ "method" : "get",
+ "doc" : "Retrieve the flow configuration with the given key"
+ }, {
+ "method" : "update",
+ "doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist."
+ }, {
+ "method" : "delete",
+ "doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
+ } ],
+ "entity" : {
+ "path" : "/flowconfigsV2/{id}"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
new file mode 100644
index 0000000..1471883
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.linkedin.common.callback.FutureCallback;
+import com.linkedin.common.util.None;
+import com.linkedin.r2.RemoteInvocationException;
+import com.linkedin.r2.transport.common.Client;
+import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
+import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.CreateIdRequest;
+import com.linkedin.restli.client.DeleteRequest;
+import com.linkedin.restli.client.GetRequest;
+import com.linkedin.restli.client.Response;
+import com.linkedin.restli.client.ResponseFuture;
+import com.linkedin.restli.client.RestClient;
+import com.linkedin.restli.client.UpdateRequest;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.IdResponse;
+
+
+/**
+ * Flow Configuration client for REST flow configuration server
+ */
+public class FlowConfigV2Client implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(FlowConfigV2Client.class);
+
+ private Optional<HttpClientFactory> _httpClientFactory;
+ private Optional<RestClient> _restClient;
+ private final FlowconfigsV2RequestBuilders _flowconfigsV2RequestBuilders;
+ public static final String DELETE_STATE_STORE_KEY = "delete.state.store";
+ private static final Pattern flowStatusIdParams = Pattern.compile(".*params:\\((?<flowStatusIdParams>.*?)\\)");
+
+ /**
+ * Construct a {@link FlowConfigV2Client} to communicate with http flow config server at URI serverUri
+ * @param serverUri address and port of the REST server
+ */
+ public FlowConfigV2Client(String serverUri) {
+ LOG.debug("FlowConfigClient with serverUri " + serverUri);
+
+ _httpClientFactory = Optional.of(new HttpClientFactory());
+ Client r2Client = new TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String, String>emptyMap()));
+ _restClient = Optional.of(new RestClient(r2Client, serverUri));
+
+ _flowconfigsV2RequestBuilders = new FlowconfigsV2RequestBuilders();
+ }
+
+ /**
+ * Construct a {@link FlowConfigV2Client} to communicate with http flow config server at URI serverUri
+ * @param restClient restClient to send restli request
+ */
+ public FlowConfigV2Client(RestClient restClient) {
+ LOG.debug("FlowConfigV2Client with restClient " + restClient);
+
+ _httpClientFactory = Optional.absent();
+ _restClient = Optional.of(restClient);
+
+ _flowconfigsV2RequestBuilders = new FlowconfigsV2RequestBuilders();
+ }
+
+ /**
+ * Create a flow configuration
+ * It differs from {@link FlowConfigClient} in a way that it returns FlowStatusId,
+ * which can be used to find the FlowExecutionId
+ * @param flowConfig FlowConfig to be used to create the flow
+ * @return FlowStatusId
+ * @throws RemoteInvocationException
+ */
+ public FlowStatusId createFlowConfig(FlowConfig flowConfig)
+ throws RemoteInvocationException {
+ LOG.debug("createFlowConfig with groupName " + flowConfig.getId().getFlowGroup() + " flowName " +
+ flowConfig.getId().getFlowName());
+
+ CreateIdRequest<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> request =
+ _flowconfigsV2RequestBuilders.create().input(flowConfig).build();
+ ResponseFuture<IdResponse<ComplexResourceKey<FlowId, FlowStatusId>>> flowConfigResponseFuture =
+ _restClient.get().sendRequest(request);
+
+ return createFlowStatusId(flowConfigResponseFuture.getResponse().getLocation().toString());
+ }
+
+ private FlowStatusId createFlowStatusId(String locationHeader) {
+ Matcher matcher = flowStatusIdParams.matcher(locationHeader);
+ matcher.find();
+ String allFields = matcher.group("flowStatusIdParams");
+ String[] flowStatusIdParams = allFields.split(",");
+ Map<String, String> paramsMap = new HashMap<>();
+ for (String flowStatusIdParam : flowStatusIdParams) {
+ paramsMap.put(flowStatusIdParam.split(":")[0], flowStatusIdParam.split(":")[1]);
+ }
+ FlowStatusId flowStatusId = new FlowStatusId()
+ .setFlowName(paramsMap.get("flowName"))
+ .setFlowGroup(paramsMap.get("flowGroup"));
+ if (paramsMap.containsKey("flowExecutionId")) {
+ flowStatusId.setFlowExecutionId(Long.parseLong(paramsMap.get("flowExecutionId")));
+ }
+ return flowStatusId;
+ }
+
+ /**
+ * Update a flow configuration
+ * @param flowConfig flow configuration attributes
+ * @throws RemoteInvocationException
+ */
+ public void updateFlowConfig(FlowConfig flowConfig)
+ throws RemoteInvocationException {
+ LOG.debug("updateFlowConfig with groupName " + flowConfig.getId().getFlowGroup() + " flowName " +
+ flowConfig.getId().getFlowName());
+
+ FlowId flowId = new FlowId().setFlowGroup(flowConfig.getId().getFlowGroup())
+ .setFlowName(flowConfig.getId().getFlowName());
+
+ UpdateRequest<FlowConfig> updateRequest =
+ _flowconfigsV2RequestBuilders.update().id(new ComplexResourceKey<>(flowId, new FlowStatusId()))
+ .input(flowConfig).build();
+
+ ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(updateRequest);
+
+ response.getResponse();
+ }
+
+ /**
+ * Get a flow configuration
+ * @param flowId identifier of flow configuration to get
+ * @return a {@link FlowConfig} with the flow configuration
+ * @throws RemoteInvocationException
+ */
+ public FlowConfig getFlowConfig(FlowId flowId)
+ throws RemoteInvocationException {
+ LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+ flowId.getFlowName());
+
+ GetRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.get()
+ .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
+
+ Response<FlowConfig> response =
+ _restClient.get().sendRequest(getRequest).getResponse();
+ return response.getEntity();
+ }
+
+ /**
+ * Delete a flow configuration
+ * @param flowId identifier of flow configuration to delete
+ * @throws RemoteInvocationException
+ */
+ public void deleteFlowConfig(FlowId flowId)
+ throws RemoteInvocationException {
+ LOG.debug("deleteFlowConfig with groupName {}, flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
+
+ DeleteRequest<FlowConfig> deleteRequest = _flowconfigsV2RequestBuilders.delete()
+ .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
+ ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);
+
+ response.getResponse();
+ }
+
+ /**
+ * Delete a flow configuration
+ * @param flowId identifier of flow configuration to delete
+ * @throws RemoteInvocationException
+ */
+ public void deleteFlowConfigWithStateStore(FlowId flowId)
+ throws RemoteInvocationException {
+ LOG.debug("deleteFlowConfig and state store with groupName " + flowId.getFlowGroup() + " flowName " +
+ flowId.getFlowName());
+
+ DeleteRequest<FlowConfig> deleteRequest = _flowconfigsV2RequestBuilders.delete()
+ .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).setHeader(DELETE_STATE_STORE_KEY, Boolean.TRUE.toString()).build();
+ ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);
+
+ response.getResponse();
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ if (_restClient.isPresent()) {
+ _restClient.get().shutdown(new FutureCallback<None>());
+ }
+
+ if (_httpClientFactory.isPresent()) {
+ _httpClientFactory.get().shutdown(new FutureCallback<None>());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
new file mode 100644
index 0000000..821b947
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+import com.linkedin.data.template.StringMap;
+import com.linkedin.restli.server.resources.BaseResource;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+
+
+@Test(groups = { "gobblin.service" })
+public class FlowConfigV2Test {
+ private FlowConfigV2Client _client;
+ private EmbeddedRestliServer _server;
+ private File _testDirectory;
+
+ private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigTest/";
+ private static final String TEST_GROUP_NAME = "testGroup1";
+ private static final String TEST_FLOW_NAME = "testFlow1";
+ private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
+ private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+
+ _testDirectory = Files.createTempDir();
+
+ configBuilder
+ .addPrimitive(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, _testDirectory.getAbsolutePath())
+ .addPrimitive(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
+ cleanUpDir(TEST_SPEC_STORE_DIR);
+
+ Config config = configBuilder.build();
+ final FlowCatalog flowCatalog = new FlowCatalog(config);
+
+ flowCatalog.startAsync();
+ flowCatalog.awaitRunning();
+
+ Injector injector = Guice.createInjector(new Module() {
+ @Override
+ public void configure(Binder binder) {
+ binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(new FlowConfigV2ResourceLocalHandler(flowCatalog));
+ // indicate that we are in unit testing since the resource is being blocked until flow catalog changes have
+ // been made
+ binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);
+ }
+ });
+
+ _server = EmbeddedRestliServer.builder().resources(
+ Lists.<Class<? extends BaseResource>>newArrayList(FlowConfigsV2Resource.class)).injector(injector).build();
+
+ _server.startAsync();
+ _server.awaitRunning();
+
+ _client =
+ new FlowConfigV2Client(String.format("http://localhost:%s/", _server.getPort()));
+ }
+
+ protected void cleanUpDir(String dir) throws Exception {
+ File specStoreDir = new File(dir);
+ if (specStoreDir.exists()) {
+ FileUtils.deleteDirectory(specStoreDir);
+ }
+ }
+
+ @Test
+ public void testCheckFlowExecutionId() throws Exception {
+ Map<String, String> flowProperties = Maps.newHashMap();
+ flowProperties.put("param1", "value1");
+
+ FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+ FlowStatusId flowStatusId =_client.createFlowConfig(flowConfig);
+ Assert.assertEquals(TEST_GROUP_NAME, flowStatusId.getFlowGroup());
+ Assert.assertEquals(TEST_FLOW_NAME, flowStatusId.getFlowName());
+ Assert.assertTrue(flowStatusId.getFlowExecutionId() != -1);
+
+ flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties))
+ .setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true));
+ Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(), -1L);
+ }
+
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (_client != null) {
+ _client.close();
+ }
+ if (_server != null) {
+ _server.stopAsync();
+ _server.awaitTerminated();
+ }
+ _testDirectory.delete();
+ cleanUpDir(TEST_SPEC_STORE_DIR);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 0007b6a..52cc0c7 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -48,7 +48,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@Slf4j
public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
@Getter
- private FlowCatalog flowCatalog;
+ protected FlowCatalog flowCatalog;
public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) {
this.flowCatalog = flowCatalog;
}
@@ -187,6 +187,14 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
Schedule schedule = flowConfig.getSchedule();
configBuilder.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, schedule.getCronSchedule());
configBuilder.addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, schedule.isRunImmediately());
+ } else {
+ // If the job does not have schedule, it is a run-once job.
+ // In this case, we add flow execution id to the flow spec now to be able to send this id back to the user for
+ // flow status tracking purpose.
+ // If it is not a run-once job, we should not add flow execution id here,
+ // because execution id is generated for every scheduled execution of the flow and cannot be materialized to
+ // the flow catalog. In this case, this id is added during flow compilation.
+ configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, String.valueOf(System.currentTimeMillis()));
}
Config config = configBuilder.build();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
new file mode 100644
index 0000000..bd77858
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.CreateResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+@Slf4j
+public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
+ public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) {
+ super(flowCatalog);
+ }
+ @Override
+ /**
+ * Add flowConfig locally and trigger all listeners iff @param triggerListener is set to true
+ */
+ public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerListener) throws FlowConfigLoggedException {
+ log.info("[GAAS-REST] Create called with flowGroup " + flowConfig.getId().getFlowGroup() + " flowName " + flowConfig.getId().getFlowName());
+ FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
+ this.flowCatalog.put(flowSpec, triggerListener);
+ FlowStatusId flowStatusId = new FlowStatusId()
+ .setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
+ .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
+ if (flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ flowStatusId.setFlowExecutionId(Long.valueOf(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
+ } else {
+ flowStatusId.setFlowExecutionId(-1L);
+ }
+ return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), HttpStatus.S_201_CREATED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
new file mode 100644
index 0000000..7053e39
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.RestLiCollection;
+import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
+/**
+ * Resource for handling flow configuration requests
+ */
+@RestLiCollection(name = "flowconfigsV2", namespace = "org.apache.gobblin.service", keyName = "id")
+public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, FlowStatusId, FlowConfig> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsV2Resource.class);
+ private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");
+
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL")
+ public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null;
+
+ @Inject
+ @Named("flowConfigsV2ResourceHandler")
+ private FlowConfigsResourceHandler flowConfigsResourceHandler;
+
+ // For blocking use of this resource until it is ready
+ @Inject
+ @Named("readyToUse")
+ private Boolean readyToUse = Boolean.FALSE;
+
+ public FlowConfigsV2Resource() {
+ }
+
+ /**
+ * Retrieve the flow configuration with the given key
+ * @param key flow config id key containing group name and flow name
+ * @return {@link FlowConfig} with flow configuration
+ */
+ @Override
+ public FlowConfig get(ComplexResourceKey<FlowId, FlowStatusId> key) {
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ return this.getFlowConfigResourceHandler().getFlowConfig(flowId);
+ }
+
+ /**
+ * Create a flow configuration that the service will forward to execution instances for execution
+ * @param flowConfig flow configuration
+ * @return {@link CreateResponse}
+ */
+ @Override
+ public CreateResponse create(FlowConfig flowConfig) {
+ return this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
+ }
+
+ /**
+ * Update the flow configuration with the specified key. Running flows are not affected.
+ * An error is raised if the flow configuration does not exist.
+ * @param key composite key containing group name and flow name that identifies the flow to update
+ * @param flowConfig new flow configuration
+ * @return {@link UpdateResponse}
+ */
+ @Override
+ public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, FlowConfig flowConfig) {
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ return this.getFlowConfigResourceHandler().updateFlowConfig(flowId, flowConfig);
+ }
+
+ /**
+ * Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows.
+ * @param key composite key containing flow group and flow name that identifies the flow to remove from the flow catalog
+ * @return {@link UpdateResponse}
+ */
+ @Override
+ public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ return this.getFlowConfigResourceHandler().deleteFlowConfig(flowId, getHeaders());
+ }
+
+ private FlowConfigsResourceHandler getFlowConfigResourceHandler() {
+ if (global_flowConfigsResourceHandler != null) {
+ return global_flowConfigsResourceHandler;
+ }
+ return flowConfigsResourceHandler;
+ }
+
+ private Properties getHeaders() {
+ Properties headerProperties = new Properties();
+ for (Map.Entry<String, String> entry : getContext().getRequestHeaders().entrySet()) {
+ if (ALLOWED_METADATA.contains(entry.getKey())) {
+ headerProperties.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return headerProperties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 071c126..9c1b42f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -78,6 +78,7 @@ import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigsResource;
import org.apache.gobblin.service.FlowConfigsResourceHandler;
import org.apache.gobblin.service.FlowId;
@@ -122,6 +123,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
protected GobblinServiceJobScheduler scheduler;
@Getter
protected GobblinServiceFlowConfigResourceHandler resourceHandler;
+ @Getter
+ protected GobblinServiceFlowConfigResourceHandler v2ResourceHandler;
protected boolean flowCatalogLocalCommit;
protected Orchestrator orchestrator;
@@ -219,13 +222,21 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
this.helixManager,
this.scheduler);
+ this.v2ResourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
+ this.flowCatalogLocalCommit,
+ new FlowConfigV2ResourceLocalHandler(this.flowCatalog),
+ this.helixManager,
+ this.scheduler);
+
this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
+
if (isRestLIServerEnabled) {
Injector injector = Guice.createInjector(new Module() {
@Override
public void configure(Binder binder) {
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(GobblinServiceManager.this.resourceHandler);
+ binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(GobblinServiceManager.this.v2ResourceHandler);
binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 6604676..adef5cc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -24,7 +24,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import javax.annotation.Nonnull;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
@@ -33,32 +35,30 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
-import org.apache.commons.lang3.StringUtils;
+import javax.annotation.Nonnull;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
-
-import lombok.Getter;
-import lombok.Setter;
// Provide base implementation for constructing multi-hops route.
@Alpha
@@ -250,7 +250,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
}
// Add flow execution id for this compilation
- long flowExecutionId = System.currentTimeMillis();
+ long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId)));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java
new file mode 100644
index 0000000..2743d27
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flow;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+public class FlowUtils {
+ /**
+ * A FlowSpec contains a FlowExecutionId if it is a runOnce flow.
+ * Refer {@link FlowConfigResourceLocalHandler#createFlowSpecForConfig} for details.
+ * @param spec flow spec
+ * @return flow execution id
+ */
+ public static long getOrCreateFlowExecutionId(FlowSpec spec) {
+ long flowExecutionId;
+ if (spec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ flowExecutionId = spec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ } else {
+ flowExecutionId = System.currentTimeMillis();
+ }
+ return flowExecutionId;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
index 236f927..6b0b14a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.service.modules.flow;
-import com.google.common.base.Splitter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -28,41 +27,43 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.jgrapht.graph.DirectedWeightedMultigraph;
+import org.slf4j.Logger;
+
import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
-import org.apache.commons.lang3.StringUtils;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.policy.ServicePolicy;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-import org.apache.gobblin.util.ClassAliasResolver;
-import org.apache.gobblin.util.ConfigUtils;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-import org.slf4j.Logger;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
+import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.policy.ServicePolicy;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.gobblin.service.ServiceConfigKeys.*;
-import static org.apache.gobblin.service.modules.utils.FindPathUtils.*;
+import static org.apache.gobblin.service.ServiceConfigKeys.SERVICE_POLICY_NAME;
+import static org.apache.gobblin.service.modules.utils.FindPathUtils.dijkstraBasedPathFindingHelper;
// Users are capable to inject hints/prioritization into route selection, in two forms:
// 1. PolicyBasedBlockedConnection: Define some undesired routes
@@ -309,7 +310,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
}
// Add flow execution id for this compilation
- long flowExecutionId = System.currentTimeMillis();
+ long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId)));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 9901c2f..6c1d0b2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -41,6 +41,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+import org.apache.gobblin.service.modules.flow.FlowUtils;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
@@ -78,6 +79,7 @@ public abstract class AbstractPathFinder implements PathFinder {
this.flowGraph = flowGraph;
this.flowSpec = flowSpec;
this.flowConfig = flowSpec.getConfig();
+ this.flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
//Get src/dest DataNodes from the flow config
String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
@@ -237,8 +239,6 @@ public abstract class AbstractPathFinder implements PathFinder {
@Override
public FlowGraphPath findPath() throws PathFinderException {
- // Generate flow execution id for this compilation
- this.flowExecutionId = System.currentTimeMillis();
FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
//Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the