You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/08 00:29:19 UTC

incubator-gobblin git commit: [GOBBLIN-538] Flow config v2 resource

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c2d59a1cb -> 1896d7fab


[GOBBLIN-538] Flow config v2 resource

Closes #2431 from
arjun4084346/FlowConfigV2Resource


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1896d7fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1896d7fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1896d7fa

Branch: refs/heads/master
Commit: 1896d7fab5ec43ac248fdb5bebf0218bed156d00
Parents: c2d59a1
Author: Arjun <ab...@linkedin.com>
Authored: Fri Sep 7 17:29:14 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Sep 7 17:29:14 2018 -0700

----------------------------------------------------------------------
 ....gobblin.service.flowconfigsV2.restspec.json |  31 +++
 .../gobblin/service/FlowConfigV2Client.java     | 213 +++++++++++++++++++
 .../gobblin/service/FlowConfigV2Test.java       | 134 ++++++++++++
 .../service/FlowConfigResourceLocalHandler.java |  10 +-
 .../FlowConfigV2ResourceLocalHandler.java       |  50 +++++
 .../gobblin/service/FlowConfigsV2Resource.java  | 122 +++++++++++
 .../modules/core/GobblinServiceManager.java     |  11 +
 .../modules/flow/BaseFlowToJobSpecCompiler.java |  34 +--
 .../gobblin/service/modules/flow/FlowUtils.java |  36 ++++
 .../flow/MultiHopsFlowToJobSpecCompiler.java    |  47 ++--
 .../pathfinder/AbstractPathFinder.java          |   4 +-
 11 files changed, 649 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
new file mode 100644
index 0000000..e45718e
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -0,0 +1,31 @@
+{
+  "name" : "flowconfigsV2",
+  "namespace" : "org.apache.gobblin.service",
+  "path" : "/flowconfigsV2",
+  "schema" : "org.apache.gobblin.service.FlowConfig",
+  "doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsV2Resource",
+  "collection" : {
+    "identifier" : {
+      "name" : "id",
+      "type" : "org.apache.gobblin.service.FlowId",
+      "params" : "org.apache.gobblin.service.FlowStatusId"
+    },
+    "supports" : [ "create", "delete", "get", "update" ],
+    "methods" : [ {
+      "method" : "create",
+      "doc" : "Create a flow configuration that the service will forward to execution instances for execution"
+    }, {
+      "method" : "get",
+      "doc" : "Retrieve the flow configuration with the given key"
+    }, {
+      "method" : "update",
+      "doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist."
+    }, {
+      "method" : "delete",
+      "doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
+    } ],
+    "entity" : {
+      "path" : "/flowconfigsV2/{id}"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
new file mode 100644
index 0000000..1471883
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.linkedin.common.callback.FutureCallback;
+import com.linkedin.common.util.None;
+import com.linkedin.r2.RemoteInvocationException;
+import com.linkedin.r2.transport.common.Client;
+import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
+import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.CreateIdRequest;
+import com.linkedin.restli.client.DeleteRequest;
+import com.linkedin.restli.client.GetRequest;
+import com.linkedin.restli.client.Response;
+import com.linkedin.restli.client.ResponseFuture;
+import com.linkedin.restli.client.RestClient;
+import com.linkedin.restli.client.UpdateRequest;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.IdResponse;
+
+
+/**
+ * Flow Configuration client for REST flow configuration server
+ */
+public class FlowConfigV2Client implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FlowConfigV2Client.class);
+
+  private Optional<HttpClientFactory> _httpClientFactory;
+  private Optional<RestClient> _restClient;
+  private final FlowconfigsV2RequestBuilders _flowconfigsV2RequestBuilders;
+  public static final String DELETE_STATE_STORE_KEY = "delete.state.store";
+  private static final Pattern flowStatusIdParams = Pattern.compile(".*params:\\((?<flowStatusIdParams>.*?)\\)");
+
+  /**
+   * Construct a {@link FlowConfigV2Client} to communicate with http flow config server at URI serverUri
+   * @param serverUri address and port of the REST server
+   */
+  public FlowConfigV2Client(String serverUri) {
+    LOG.debug("FlowConfigClient with serverUri " + serverUri);
+
+    _httpClientFactory = Optional.of(new HttpClientFactory());
+    Client r2Client = new TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String, String>emptyMap()));
+    _restClient = Optional.of(new RestClient(r2Client, serverUri));
+
+    _flowconfigsV2RequestBuilders = new FlowconfigsV2RequestBuilders();
+  }
+
+  /**
+   * Construct a {@link FlowConfigV2Client} to communicate with http flow config server at URI serverUri
+   * @param restClient restClient to send restli request
+   */
+  public FlowConfigV2Client(RestClient restClient) {
+    LOG.debug("FlowConfigV2Client with restClient " + restClient);
+
+    _httpClientFactory = Optional.absent();
+    _restClient = Optional.of(restClient);
+
+    _flowconfigsV2RequestBuilders = new FlowconfigsV2RequestBuilders();
+  }
+
+  /**
+   * Create a flow configuration
+   * It differs from {@link FlowConfigClient} in a way that it returns FlowStatusId,
+   * which can be used to find the FlowExecutionId
+   * @param flowConfig FlowConfig to be used to create the flow
+   * @return FlowStatusId
+   * @throws RemoteInvocationException
+   */
+  public FlowStatusId createFlowConfig(FlowConfig flowConfig)
+      throws RemoteInvocationException {
+    LOG.debug("createFlowConfig with groupName " + flowConfig.getId().getFlowGroup() + " flowName " +
+        flowConfig.getId().getFlowName());
+
+    CreateIdRequest<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> request =
+        _flowconfigsV2RequestBuilders.create().input(flowConfig).build();
+    ResponseFuture<IdResponse<ComplexResourceKey<FlowId, FlowStatusId>>> flowConfigResponseFuture =
+        _restClient.get().sendRequest(request);
+
+    return createFlowStatusId(flowConfigResponseFuture.getResponse().getLocation().toString());
+  }
+
+  private FlowStatusId createFlowStatusId(String locationHeader) {
+    Matcher matcher = flowStatusIdParams.matcher(locationHeader);
+    matcher.find();
+    String allFields = matcher.group("flowStatusIdParams");
+    String[] flowStatusIdParams = allFields.split(",");
+    Map<String, String> paramsMap = new HashMap<>();
+    for (String flowStatusIdParam : flowStatusIdParams) {
+      paramsMap.put(flowStatusIdParam.split(":")[0], flowStatusIdParam.split(":")[1]);
+    }
+    FlowStatusId flowStatusId = new FlowStatusId()
+        .setFlowName(paramsMap.get("flowName"))
+        .setFlowGroup(paramsMap.get("flowGroup"));
+    if (paramsMap.containsKey("flowExecutionId")) {
+      flowStatusId.setFlowExecutionId(Long.parseLong(paramsMap.get("flowExecutionId")));
+    }
+    return flowStatusId;
+  }
+
+  /**
+   * Update a flow configuration
+   * @param flowConfig flow configuration attributes
+   * @throws RemoteInvocationException
+   */
+  public void updateFlowConfig(FlowConfig flowConfig)
+      throws RemoteInvocationException {
+    LOG.debug("updateFlowConfig with groupName " + flowConfig.getId().getFlowGroup() + " flowName " +
+        flowConfig.getId().getFlowName());
+
+    FlowId flowId = new FlowId().setFlowGroup(flowConfig.getId().getFlowGroup())
+        .setFlowName(flowConfig.getId().getFlowName());
+
+    UpdateRequest<FlowConfig> updateRequest =
+        _flowconfigsV2RequestBuilders.update().id(new ComplexResourceKey<>(flowId, new FlowStatusId()))
+            .input(flowConfig).build();
+
+    ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(updateRequest);
+
+    response.getResponse();
+  }
+
+  /**
+   * Get a flow configuration
+   * @param flowId identifier of flow configuration to get
+   * @return a {@link FlowConfig} with the flow configuration
+   * @throws RemoteInvocationException
+   */
+  public FlowConfig getFlowConfig(FlowId flowId)
+      throws RemoteInvocationException {
+    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
+        flowId.getFlowName());
+
+    GetRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.get()
+        .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
+
+    Response<FlowConfig> response =
+        _restClient.get().sendRequest(getRequest).getResponse();
+    return response.getEntity();
+  }
+
+  /**
+   * Delete a flow configuration
+   * @param flowId identifier of flow configuration to delete
+   * @throws RemoteInvocationException
+   */
+  public void deleteFlowConfig(FlowId flowId)
+      throws RemoteInvocationException {
+    LOG.debug("deleteFlowConfig with groupName {}, flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
+
+    DeleteRequest<FlowConfig> deleteRequest = _flowconfigsV2RequestBuilders.delete()
+        .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
+    ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);
+
+    response.getResponse();
+  }
+
+  /**
+   * Delete a flow configuration
+   * @param flowId identifier of flow configuration to delete
+   * @throws RemoteInvocationException
+   */
+  public void deleteFlowConfigWithStateStore(FlowId flowId)
+      throws RemoteInvocationException {
+    LOG.debug("deleteFlowConfig and state store with groupName " + flowId.getFlowGroup() + " flowName " +
+        flowId.getFlowName());
+
+    DeleteRequest<FlowConfig> deleteRequest = _flowconfigsV2RequestBuilders.delete()
+        .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).setHeader(DELETE_STATE_STORE_KEY, Boolean.TRUE.toString()).build();
+    ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);
+
+    response.getResponse();
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    if (_restClient.isPresent()) {
+      _restClient.get().shutdown(new FutureCallback<None>());
+    }
+
+    if (_httpClientFactory.isPresent()) {
+      _httpClientFactory.get().shutdown(new FutureCallback<None>());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
new file mode 100644
index 0000000..821b947
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+import com.linkedin.data.template.StringMap;
+import com.linkedin.restli.server.resources.BaseResource;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+
+
+@Test(groups = { "gobblin.service" })
+public class FlowConfigV2Test {
+  private FlowConfigV2Client _client;
+  private EmbeddedRestliServer _server;
+  private File _testDirectory;
+
+  private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigTest/";
+  private static final String TEST_GROUP_NAME = "testGroup1";
+  private static final String TEST_FLOW_NAME = "testFlow1";
+  private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
+  private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+
+    _testDirectory = Files.createTempDir();
+
+    configBuilder
+        .addPrimitive(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, _testDirectory.getAbsolutePath())
+        .addPrimitive(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, TEST_SPEC_STORE_DIR);
+    cleanUpDir(TEST_SPEC_STORE_DIR);
+
+    Config config = configBuilder.build();
+    final FlowCatalog flowCatalog = new FlowCatalog(config);
+
+    flowCatalog.startAsync();
+    flowCatalog.awaitRunning();
+
+    Injector injector = Guice.createInjector(new Module() {
+      @Override
+      public void configure(Binder binder) {
+        binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(new FlowConfigV2ResourceLocalHandler(flowCatalog));
+        // indicate that we are in unit testing since the resource is being blocked until flow catalog changes have
+        // been made
+        binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);
+      }
+    });
+
+    _server = EmbeddedRestliServer.builder().resources(
+        Lists.<Class<? extends BaseResource>>newArrayList(FlowConfigsV2Resource.class)).injector(injector).build();
+
+    _server.startAsync();
+    _server.awaitRunning();
+
+    _client =
+        new FlowConfigV2Client(String.format("http://localhost:%s/", _server.getPort()));
+  }
+
+  protected void cleanUpDir(String dir) throws Exception {
+    File specStoreDir = new File(dir);
+    if (specStoreDir.exists()) {
+      FileUtils.deleteDirectory(specStoreDir);
+    }
+  }
+
+  @Test
+  public void testCheckFlowExecutionId() throws Exception {
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+    FlowStatusId flowStatusId =_client.createFlowConfig(flowConfig);
+    Assert.assertEquals(TEST_GROUP_NAME, flowStatusId.getFlowGroup());
+    Assert.assertEquals(TEST_FLOW_NAME, flowStatusId.getFlowName());
+    Assert.assertTrue(flowStatusId.getFlowExecutionId() != -1);
+
+    flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties))
+        .setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true));
+    Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(), -1L);
+  }
+
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    if (_client != null) {
+      _client.close();
+    }
+    if (_server != null) {
+      _server.stopAsync();
+      _server.awaitTerminated();
+    }
+    _testDirectory.delete();
+    cleanUpDir(TEST_SPEC_STORE_DIR);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 0007b6a..52cc0c7 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -48,7 +48,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 @Slf4j
 public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
   @Getter
-  private FlowCatalog flowCatalog;
+  protected FlowCatalog flowCatalog;
   public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) {
     this.flowCatalog = flowCatalog;
   }
@@ -187,6 +187,14 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       Schedule schedule = flowConfig.getSchedule();
       configBuilder.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, schedule.getCronSchedule());
       configBuilder.addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, schedule.isRunImmediately());
+    } else {
+      // If the job does not have schedule, it is a run-once job.
+      // In this case, we add flow execution id to the flow spec now to be able to send this id back to the user for
+      // flow status tracking purpose.
+      // If it is not a run-once job, we should not add flow execution id here,
+      // because execution id is generated for every scheduled execution of the flow and cannot be materialized to
+      // the flow catalog. In this case, this id is added during flow compilation.
+      configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, String.valueOf(System.currentTimeMillis()));
     }
 
     Config config = configBuilder.build();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
new file mode 100644
index 0000000..bd77858
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.CreateResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+@Slf4j
+public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
+  public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) {
+    super(flowCatalog);
+  }
+  @Override
+  /**
+   * Add flowConfig locally and trigger all listeners iff @param triggerListener is set to true
+   */
+  public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerListener) throws FlowConfigLoggedException {
+    log.info("[GAAS-REST] Create called with flowGroup " + flowConfig.getId().getFlowGroup() + " flowName " + flowConfig.getId().getFlowName());
+    FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
+    this.flowCatalog.put(flowSpec, triggerListener);
+    FlowStatusId flowStatusId = new FlowStatusId()
+        .setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
+        .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
+    if (flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+      flowStatusId.setFlowExecutionId(Long.valueOf(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
+    } else {
+      flowStatusId.setFlowExecutionId(-1L);
+    }
+    return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), HttpStatus.S_201_CREATED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
new file mode 100644
index 0000000..7053e39
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.RestLiCollection;
+import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
+/**
+ * Resource for handling flow configuration requests
+ */
+@RestLiCollection(name = "flowconfigsV2", namespace = "org.apache.gobblin.service", keyName = "id")
+public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, FlowStatusId, FlowConfig> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsV2Resource.class);
+  private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");
+
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL")
+  public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null;
+
+  @Inject
+  @Named("flowConfigsV2ResourceHandler")
+  private FlowConfigsResourceHandler flowConfigsResourceHandler;
+
+  // For blocking use of this resource until it is ready
+  @Inject
+  @Named("readyToUse")
+  private Boolean readyToUse = Boolean.FALSE;
+
+  public FlowConfigsV2Resource() {
+  }
+
+  /**
+   * Retrieve the flow configuration with the given key
+   * @param key flow config id key containing group name and flow name
+   * @return {@link FlowConfig} with flow configuration
+   */
+  @Override
+  public FlowConfig get(ComplexResourceKey<FlowId, FlowStatusId> key) {
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    return this.getFlowConfigResourceHandler().getFlowConfig(flowId);
+  }
+
+  /**
+   * Create a flow configuration that the service will forward to execution instances for execution
+   * @param flowConfig flow configuration
+   * @return {@link CreateResponse}
+   */
+  @Override
+  public CreateResponse create(FlowConfig flowConfig) {
+    return this.getFlowConfigResourceHandler().createFlowConfig(flowConfig);
+  }
+
+  /**
+   * Update the flow configuration with the specified key. Running flows are not affected.
+   * An error is raised if the flow configuration does not exist.
+   * @param key composite key containing group name and flow name that identifies the flow to update
+   * @param flowConfig new flow configuration
+   * @return {@link UpdateResponse}
+   */
+  @Override
+  public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, FlowConfig flowConfig) {
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    return this.getFlowConfigResourceHandler().updateFlowConfig(flowId, flowConfig);
+  }
+
+  /**
+   * Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows.
+   * @param key composite key containing flow group and flow name that identifies the flow to remove from the flow catalog
+   * @return {@link UpdateResponse}
+   */
+  @Override
+  public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    return this.getFlowConfigResourceHandler().deleteFlowConfig(flowId, getHeaders());
+  }
+
+  private FlowConfigsResourceHandler getFlowConfigResourceHandler() {
+    if (global_flowConfigsResourceHandler != null) {
+      return global_flowConfigsResourceHandler;
+    }
+    return flowConfigsResourceHandler;
+  }
+
+  private Properties getHeaders() {
+    Properties headerProperties = new Properties();
+    for (Map.Entry<String, String> entry : getContext().getRequestHeaders().entrySet()) {
+      if (ALLOWED_METADATA.contains(entry.getKey())) {
+        headerProperties.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return headerProperties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 071c126..9c1b42f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -78,6 +78,7 @@ import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
 import org.apache.gobblin.service.FlowConfigsResource;
 import org.apache.gobblin.service.FlowConfigsResourceHandler;
 import org.apache.gobblin.service.FlowId;
@@ -122,6 +123,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
   protected GobblinServiceJobScheduler scheduler;
   @Getter
   protected GobblinServiceFlowConfigResourceHandler resourceHandler;
+  @Getter
+  protected GobblinServiceFlowConfigResourceHandler v2ResourceHandler;
 
   protected boolean flowCatalogLocalCommit;
   protected Orchestrator orchestrator;
@@ -219,13 +222,21 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
         this.helixManager,
         this.scheduler);
 
+    this.v2ResourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
+        this.flowCatalogLocalCommit,
+        new FlowConfigV2ResourceLocalHandler(this.flowCatalog),
+        this.helixManager,
+        this.scheduler);
+
     this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
         ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
+
     if (isRestLIServerEnabled) {
       Injector injector = Guice.createInjector(new Module() {
         @Override
         public void configure(Binder binder) {
           binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(GobblinServiceManager.this.resourceHandler);
+          binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(GobblinServiceManager.this.v2ResourceHandler);
           binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);
         }
       });

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 6604676..adef5cc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -24,7 +24,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import javax.annotation.Nonnull;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
@@ -33,32 +35,30 @@ import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.commons.lang3.StringUtils;
+import javax.annotation.Nonnull;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
-
-import lombok.Getter;
-import lombok.Setter;
 
 // Provide base implementation for constructing multi-hops route.
 @Alpha
@@ -250,7 +250,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
     }
 
     // Add flow execution id for this compilation
-    long flowExecutionId = System.currentTimeMillis();
+    long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
     jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
         ConfigValueFactory.fromAnyRef(flowExecutionId)));
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java
new file mode 100644
index 0000000..2743d27
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flow;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+public class FlowUtils {
+  /**
+   * A FlowSpec contains a FlowExecutionId if it is a runOnce flow.
+   * Refer {@link FlowConfigResourceLocalHandler#createFlowSpecForConfig} for details.
+   * @param spec flow spec
+   * @return flow execution id
+   */
+  public static long getOrCreateFlowExecutionId(FlowSpec spec) {
+    long flowExecutionId;
+    if (spec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+      flowExecutionId = spec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    } else {
+      flowExecutionId = System.currentTimeMillis();
+    }
+    return flowExecutionId;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
index 236f927..6b0b14a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.flow;
 
-import com.google.common.base.Splitter;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -28,41 +27,43 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
+import org.jgrapht.graph.DirectedWeightedMultigraph;
+import org.slf4j.Logger;
+
 import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.commons.lang3.StringUtils;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.policy.ServicePolicy;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-import org.apache.gobblin.util.ClassAliasResolver;
-import org.apache.gobblin.util.ConfigUtils;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-import org.slf4j.Logger;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.ServiceNode;
+import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
+import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.policy.ServicePolicy;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.gobblin.service.ServiceConfigKeys.*;
-import static org.apache.gobblin.service.modules.utils.FindPathUtils.*;
+import static org.apache.gobblin.service.ServiceConfigKeys.SERVICE_POLICY_NAME;
+import static org.apache.gobblin.service.modules.utils.FindPathUtils.dijkstraBasedPathFindingHelper;
 
 // Users are capable to inject hints/prioritization into route selection, in two forms:
 // 1. PolicyBasedBlockedConnection: Define some undesired routes
@@ -309,7 +310,7 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
     }
 
     // Add flow execution id for this compilation
-    long flowExecutionId = System.currentTimeMillis();
+    long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
     jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
         ConfigValueFactory.fromAnyRef(flowExecutionId)));
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1896d7fa/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 9901c2f..6c1d0b2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -41,6 +41,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
 import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
 import org.apache.gobblin.service.modules.flow.FlowGraphPath;
+import org.apache.gobblin.service.modules.flow.FlowUtils;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
@@ -78,6 +79,7 @@ public abstract class AbstractPathFinder implements PathFinder {
     this.flowGraph = flowGraph;
     this.flowSpec = flowSpec;
     this.flowConfig = flowSpec.getConfig();
+    this.flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
 
     //Get src/dest DataNodes from the flow config
     String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "");
@@ -237,8 +239,6 @@ public abstract class AbstractPathFinder implements PathFinder {
 
   @Override
   public FlowGraphPath findPath() throws PathFinderException {
-    // Generate flow execution id for this compilation
-    this.flowExecutionId = System.currentTimeMillis();
 
     FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
     //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the