You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/11/03 22:14:13 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1285] Add option to only accept requests to leader node and redirect requests in the client

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f276724  [GOBBLIN-1285] Add option to only accept requests to leader node and redirect requests in the client
f276724 is described below

commit f2767242fe863307feaf68332f0d3617f30e8e4b
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Nov 3 14:14:06 2020 -0800

    [GOBBLIN-1285] Add option to only accept requests to leader node and redirect requests in the client
    
    Closes #3124 from jack-moseley/gaas-redirect
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   7 +
 gobblin-restli/client.gradle                       |   3 +
 ...pache.gobblin.service.flowconfigs.snapshot.json | 106 ++++---
 ...che.gobblin.service.flowconfigsV2.snapshot.json |  94 +++---
 ...he.gobblin.service.flowexecutions.snapshot.json | 332 ++++++++++-----------
 ...ache.gobblin.service.flowstatuses.snapshot.json | 264 ++++++++--------
 .../apache/gobblin/service/FlowClientUtils.java    |  66 ++++
 .../apache/gobblin/service/FlowConfigV2Client.java |  16 +-
 .../gobblin/service/FlowExecutionClient.java       |  16 +
 .../gobblin/service/FlowExecutionResource.java     | 139 +--------
 .../service/FlowExecutionResourceHandler.java      |  52 ++++
 ...java => FlowExecutionResourceLocalHandler.java} |  74 ++---
 .../apache/gobblin/service/FlowStatusResource.java |   7 +-
 .../service/monitoring/FlowStatusGenerator.java    |   9 -
 .../modules/core/GobblinServiceManager.java        |  31 +-
 .../GobblinServiceFlowConfigResourceHandler.java   |  17 +-
 ...GobblinServiceFlowExecutionResourceHandler.java |  83 ++++++
 .../gobblin/service/modules/utils/HelixUtils.java  |  82 ++++-
 .../modules/core/GobblinServiceRedirectTest.java   | 263 ++++++++++++++++
 19 files changed, 1049 insertions(+), 612 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index e161b78..6aefffb 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -115,6 +115,8 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties";
   // GAAS Listerning Port
   public static final String SERVICE_PORT = GOBBLIN_SERVICE_PREFIX + "port";
+  public static final String SERVICE_NAME = GOBBLIN_SERVICE_PREFIX + "serviceName";
+  public static final String SERVICE_URL_PREFIX = GOBBLIN_SERVICE_PREFIX + "serviceUrlPrefix";
 
   // Prefix for config to ServiceBasedAppLauncher that will only be used by GaaS and not orchestrated jobs
   public static final String GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX = "gobblinServiceAppLauncher";
@@ -122,4 +124,9 @@ public class ServiceConfigKeys {
   //Flow concurrency config key to control default service behavior.
   public static final String FLOW_CONCURRENCY_ALLOWED = GOBBLIN_SERVICE_PREFIX + "flowConcurrencyAllowed";
   public static final Boolean DEFAULT_FLOW_CONCURRENCY_ALLOWED = true;
+
+  public static final String LEADER_URL = "leaderUrl";
+
+  public static final String FORCE_LEADER = GOBBLIN_SERVICE_PREFIX + "forceLeader";
+  public static final boolean DEFAULT_FORCE_LEADER = false;
 }
diff --git a/gobblin-restli/client.gradle b/gobblin-restli/client.gradle
index 215c17d..ca13aeb 100644
--- a/gobblin-restli/client.gradle
+++ b/gobblin-restli/client.gradle
@@ -47,6 +47,9 @@ if (file('extraDependencies.gradle').exists()) {
 dependencies {
   compile project(path: apiProject.path, configuration: 'dataTemplate')
   compile project(path: apiProject.path, configuration: 'restClient')
+  compile(project(':gobblin-api')) {
+    transitive = false
+  }
 
   testCompile project(":gobblin-restli:gobblin-restli-utils")
   testCompile externalDependency.testng
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
index 9485f8d..86c9ee4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
@@ -1,48 +1,13 @@
 {
   "models" : [ {
     "type" : "record",
-    "name" : "FlowId",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Identifier for a Gobblin as a Service flow",
-    "fields" : [ {
-      "name" : "flowName",
-      "type" : "string",
-      "doc" : "Name of the flow",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 128
-        }
-      }
-    }, {
-      "name" : "flowGroup",
-      "type" : "string",
-      "doc" : "Group of the flow. This defines the namespace for the flow.",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 128
-        }
-      }
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "Schedule",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Attributes for defining a job schedule",
-    "fields" : [ {
-      "name" : "cronSchedule",
-      "type" : "string",
-      "doc" : "Schedule for flow in cron format",
-      "validate" : {
-        "org.apache.gobblin.service.validator.CronValidator" : { }
-      }
-    }, {
-      "name" : "runImmediately",
-      "type" : "boolean",
-      "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
-      "default" : false
-    } ]
+    "name" : "EmptyRecord",
+    "namespace" : "com.linkedin.restli.common",
+    "doc" : "An literally empty record.  Intended as a marker to indicate the absence of content where a record type is required.  If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this.  For example,  CreateRequest extends Request<EmptyRecord> to indicate it has no response body.   Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
+    "fields" : [ ],
+    "validate" : {
+      "com.linkedin.restli.common.EmptyRecordValidator" : { }
+    }
   }, {
     "type" : "record",
     "name" : "FlowConfig",
@@ -50,11 +15,53 @@
     "doc" : "Defines a flow configuration that can be compiled into Gobblin jobs",
     "fields" : [ {
       "name" : "id",
-      "type" : "FlowId",
+      "type" : {
+        "type" : "record",
+        "name" : "FlowId",
+        "doc" : "Identifier for a Gobblin as a Service flow",
+        "fields" : [ {
+          "name" : "flowName",
+          "type" : "string",
+          "doc" : "Name of the flow",
+          "validate" : {
+            "strlen" : {
+              "max" : 128,
+              "min" : 1
+            }
+          }
+        }, {
+          "name" : "flowGroup",
+          "type" : "string",
+          "doc" : "Group of the flow. This defines the namespace for the flow.",
+          "validate" : {
+            "strlen" : {
+              "max" : 128,
+              "min" : 1
+            }
+          }
+        } ]
+      },
       "doc" : "Identifier for the flow"
     }, {
       "name" : "schedule",
-      "type" : "Schedule",
+      "type" : {
+        "type" : "record",
+        "name" : "Schedule",
+        "doc" : "Attributes for defining a job schedule",
+        "fields" : [ {
+          "name" : "cronSchedule",
+          "type" : "string",
+          "doc" : "Schedule for flow in cron format",
+          "validate" : {
+            "org.apache.gobblin.service.validator.CronValidator" : { }
+          }
+        }, {
+          "name" : "runImmediately",
+          "type" : "boolean",
+          "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
+          "default" : false
+        } ]
+      },
       "doc" : "Optional schedule for when to execution the flow. If a schedule is not specified then the flow is executed immediately.",
       "optional" : true
     }, {
@@ -77,16 +84,7 @@
       },
       "doc" : "Properties for the flow. These properties are passed to the compiled Gobblin jobs."
     } ]
-  }, {
-    "type" : "record",
-    "name" : "EmptyRecord",
-    "namespace" : "com.linkedin.restli.common",
-    "doc" : "An literally empty record.  Intended as a marker to indicate the absence of content where a record type is required.  If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this.  For example,  CreateRequest extends Request<EmptyRecord> to indicate it has no response body.   Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
-    "fields" : [ ],
-    "validate" : {
-      "com.linkedin.restli.common.EmptyRecordValidator" : { }
-    }
-  } ],
+  }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.Schedule" ],
   "schema" : {
     "name" : "flowconfigs",
     "namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index fa8e0cd..13da7a5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -1,60 +1,58 @@
 {
   "models" : [ {
     "type" : "record",
-    "name" : "FlowId",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Identifier for a Gobblin as a Service flow",
-    "fields" : [ {
-      "name" : "flowName",
-      "type" : "string",
-      "doc" : "Name of the flow",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 128
-        }
-      }
-    }, {
-      "name" : "flowGroup",
-      "type" : "string",
-      "doc" : "Group of the flow. This defines the namespace for the flow.",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 128
-        }
-      }
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "Schedule",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Attributes for defining a job schedule",
-    "fields" : [ {
-      "name" : "cronSchedule",
-      "type" : "string",
-      "doc" : "Schedule for flow in cron format",
-      "validate" : {
-        "org.apache.gobblin.service.validator.CronValidator" : { }
-      }
-    }, {
-      "name" : "runImmediately",
-      "type" : "boolean",
-      "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
-      "default" : false
-    } ]
-  }, {
-    "type" : "record",
     "name" : "FlowConfig",
     "namespace" : "org.apache.gobblin.service",
     "doc" : "Defines a flow configuration that can be compiled into Gobblin jobs",
     "fields" : [ {
       "name" : "id",
-      "type" : "FlowId",
+      "type" : {
+        "type" : "record",
+        "name" : "FlowId",
+        "doc" : "Identifier for a Gobblin as a Service flow",
+        "fields" : [ {
+          "name" : "flowName",
+          "type" : "string",
+          "doc" : "Name of the flow",
+          "validate" : {
+            "strlen" : {
+              "max" : 128,
+              "min" : 1
+            }
+          }
+        }, {
+          "name" : "flowGroup",
+          "type" : "string",
+          "doc" : "Group of the flow. This defines the namespace for the flow.",
+          "validate" : {
+            "strlen" : {
+              "max" : 128,
+              "min" : 1
+            }
+          }
+        } ]
+      },
       "doc" : "Identifier for the flow"
     }, {
       "name" : "schedule",
-      "type" : "Schedule",
+      "type" : {
+        "type" : "record",
+        "name" : "Schedule",
+        "doc" : "Attributes for defining a job schedule",
+        "fields" : [ {
+          "name" : "cronSchedule",
+          "type" : "string",
+          "doc" : "Schedule for flow in cron format",
+          "validate" : {
+            "org.apache.gobblin.service.validator.CronValidator" : { }
+          }
+        }, {
+          "name" : "runImmediately",
+          "type" : "boolean",
+          "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
+          "default" : false
+        } ]
+      },
       "doc" : "Optional schedule for when to execution the flow. If a schedule is not specified then the flow is executed immediately.",
       "optional" : true
     }, {
@@ -77,7 +75,7 @@
       },
       "doc" : "Properties for the flow. These properties are passed to the compiled Gobblin jobs."
     } ]
-  }, {
+  }, "org.apache.gobblin.service.FlowId", {
     "type" : "record",
     "name" : "FlowStatusId",
     "namespace" : "org.apache.gobblin.service",
@@ -95,7 +93,7 @@
       "type" : "long",
       "doc" : "Execution id for the flow"
     } ]
-  } ],
+  }, "org.apache.gobblin.service.Schedule" ],
   "schema" : {
     "name" : "flowconfigsV2",
     "namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index ed12a21..65321d6 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -1,36 +1,13 @@
 {
   "models" : [ {
     "type" : "record",
-    "name" : "FlowStatusId",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Identifier for a specific execution of a flow",
-    "fields" : [ {
-      "name" : "flowName",
-      "type" : "string",
-      "doc" : "Name of the flow"
-    }, {
-      "name" : "flowGroup",
-      "type" : "string",
-      "doc" : "Group of the flow. This defines the namespace for the flow."
-    }, {
-      "name" : "flowExecutionId",
-      "type" : "long",
-      "doc" : "Execution id for the flow"
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "FlowStatistics",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Statistics from a flow execution",
-    "fields" : [ {
-      "name" : "executionStartTime",
-      "type" : "long",
-      "doc" : "Epoch time of when the execution began"
-    }, {
-      "name" : "executionEndTime",
-      "type" : "long",
-      "doc" : "Epoch time of when the execution ended"
-    } ]
+    "name" : "EmptyRecord",
+    "namespace" : "com.linkedin.restli.common",
+    "doc" : "An literally empty record.  Intended as a marker to indicate the absence of content where a record type is required.  If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this.  For example,  CreateRequest extends Request<EmptyRecord> to indicate it has no response body.   Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
+    "fields" : [ ],
+    "validate" : {
+      "com.linkedin.restli.common.EmptyRecordValidator" : { }
+    }
   }, {
     "type" : "enum",
     "name" : "ExecutionStatus",
@@ -38,146 +15,57 @@
     "doc" : "Execution status for a flow or job",
     "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
     "symbolDocs" : {
+      "CANCELLED" : "Flow cancelled.",
       "COMPILED" : "Flow compiled to jobs.",
-      "PENDING" : "Flow or job is in pending state.",
-      "PENDING_RETRY" : "Flow or job is pending retry.",
-      "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
-      "RUNNING" : "Flow or job is currently executing",
       "COMPLETE" : "Flow or job completed execution",
       "FAILED" : "Flow or job failed",
-      "CANCELLED" : "Flow cancelled."
+      "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
+      "PENDING" : "Flow or job is in pending state.",
+      "PENDING_RETRY" : "Flow or job is pending retry.",
+      "RUNNING" : "Flow or job is currently executing"
     }
   }, {
     "type" : "record",
-    "name" : "FlowId",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Identifier for a Gobblin as a Service flow",
-    "fields" : [ {
-      "name" : "flowName",
-      "type" : "string",
-      "doc" : "Name of the flow",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 128
-        }
-      }
-    }, {
-      "name" : "flowGroup",
-      "type" : "string",
-      "doc" : "Group of the flow. This defines the namespace for the flow.",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 128
-        }
-      }
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "JobId",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Identifier for a Gobblin job",
-    "fields" : [ {
-      "name" : "jobName",
-      "type" : "string",
-      "doc" : "Name of the job",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 600
-        }
-      }
-    }, {
-      "name" : "jobGroup",
-      "type" : "string",
-      "doc" : "Group of the job. This defines the namespace for the job.",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 128
-        }
-      }
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "JobStatistics",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Statistics from a job execution",
-    "fields" : [ {
-      "name" : "executionStartTime",
-      "type" : "long",
-      "doc" : "Epoch time of when the execution began"
-    }, {
-      "name" : "executionEndTime",
-      "type" : "long",
-      "doc" : "Epoch time of when the execution ended"
-    }, {
-      "name" : "processedCount",
-      "type" : "long",
-      "doc" : "number of records processed in the last job execution"
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "JobState",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Job state that is updated at the end of an execution",
-    "fields" : [ {
-      "name" : "lowWatermark",
-      "type" : "string",
-      "doc" : "Low watermark after last execution"
-    }, {
-      "name" : "highWatermark",
-      "type" : "string",
-      "doc" : "High watermark after last execution"
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "JobStatus",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Execution status for a job",
-    "fields" : [ {
-      "name" : "flowId",
-      "type" : "FlowId",
-      "doc" : "Identifier of the flow the job belongs to"
-    }, {
-      "name" : "jobId",
-      "type" : "JobId",
-      "doc" : "Identifier of the job"
-    }, {
-      "name" : "jobTag",
-      "type" : "string",
-      "doc" : "Tag of the job",
-      "optional" : true
-    }, {
-      "name" : "executionStatus",
-      "type" : "ExecutionStatus",
-      "doc" : "Job execution status"
-    }, {
-      "name" : "message",
-      "type" : "string",
-      "doc" : "Error or status message"
-    }, {
-      "name" : "executionStatistics",
-      "type" : "JobStatistics",
-      "doc" : "Statistics from the job execution. The values may be updated during the run of a job."
-    }, {
-      "name" : "jobState",
-      "type" : "JobState",
-      "doc" : "Job state that is updated only at the start and end of a job execution."
-    } ]
-  }, {
-    "type" : "record",
     "name" : "FlowExecution",
     "namespace" : "org.apache.gobblin.service",
     "doc" : "Represents an execution of a flow",
     "fields" : [ {
       "name" : "id",
-      "type" : "FlowStatusId",
+      "type" : {
+        "type" : "record",
+        "name" : "FlowStatusId",
+        "doc" : "Identifier for a specific execution of a flow",
+        "fields" : [ {
+          "name" : "flowName",
+          "type" : "string",
+          "doc" : "Name of the flow"
+        }, {
+          "name" : "flowGroup",
+          "type" : "string",
+          "doc" : "Group of the flow. This defines the namespace for the flow."
+        }, {
+          "name" : "flowExecutionId",
+          "type" : "long",
+          "doc" : "Execution id for the flow"
+        } ]
+      },
       "doc" : "Flow status identifier"
     }, {
       "name" : "executionStatistics",
-      "type" : "FlowStatistics",
+      "type" : {
+        "type" : "record",
+        "name" : "FlowStatistics",
+        "doc" : "Statistics from a flow execution",
+        "fields" : [ {
+          "name" : "executionStartTime",
+          "type" : "long",
+          "doc" : "Epoch time of when the execution began"
+        }, {
+          "name" : "executionEndTime",
+          "type" : "long",
+          "doc" : "Epoch time of when the execution ended"
+        } ]
+      },
       "doc" : "Statistics for this execution of the flow"
     }, {
       "name" : "executionStatus",
@@ -191,20 +79,130 @@
       "name" : "jobStatuses",
       "type" : {
         "type" : "array",
-        "items" : "JobStatus"
+        "items" : {
+          "type" : "record",
+          "name" : "JobStatus",
+          "doc" : "Execution status for a job",
+          "fields" : [ {
+            "name" : "flowId",
+            "type" : {
+              "type" : "record",
+              "name" : "FlowId",
+              "doc" : "Identifier for a Gobblin as a Service flow",
+              "fields" : [ {
+                "name" : "flowName",
+                "type" : "string",
+                "doc" : "Name of the flow",
+                "validate" : {
+                  "strlen" : {
+                    "max" : 128,
+                    "min" : 1
+                  }
+                }
+              }, {
+                "name" : "flowGroup",
+                "type" : "string",
+                "doc" : "Group of the flow. This defines the namespace for the flow.",
+                "validate" : {
+                  "strlen" : {
+                    "max" : 128,
+                    "min" : 1
+                  }
+                }
+              } ]
+            },
+            "doc" : "Identifier of the flow the job belongs to"
+          }, {
+            "name" : "jobId",
+            "type" : {
+              "type" : "record",
+              "name" : "JobId",
+              "doc" : "Identifier for a Gobblin job",
+              "fields" : [ {
+                "name" : "jobName",
+                "type" : "string",
+                "doc" : "Name of the job",
+                "validate" : {
+                  "strlen" : {
+                    "max" : 600,
+                    "min" : 1
+                  }
+                }
+              }, {
+                "name" : "jobGroup",
+                "type" : "string",
+                "doc" : "Group of the job. This defines the namespace for the job.",
+                "validate" : {
+                  "strlen" : {
+                    "max" : 128,
+                    "min" : 1
+                  }
+                }
+              } ]
+            },
+            "doc" : "Identifier of the job"
+          }, {
+            "name" : "jobTag",
+            "type" : "string",
+            "doc" : "Tag of the job",
+            "optional" : true
+          }, {
+            "name" : "executionStatus",
+            "type" : "ExecutionStatus",
+            "doc" : "Job execution status"
+          }, {
+            "name" : "message",
+            "type" : "string",
+            "doc" : "Error or status message"
+          }, {
+            "name" : "metrics",
+            "type" : "string",
+            "doc" : "metrics information",
+            "optional" : true
+          }, {
+            "name" : "executionStatistics",
+            "type" : {
+              "type" : "record",
+              "name" : "JobStatistics",
+              "doc" : "Statistics from a job execution",
+              "fields" : [ {
+                "name" : "executionStartTime",
+                "type" : "long",
+                "doc" : "Epoch time of when the execution began"
+              }, {
+                "name" : "executionEndTime",
+                "type" : "long",
+                "doc" : "Epoch time of when the execution ended"
+              }, {
+                "name" : "processedCount",
+                "type" : "long",
+                "doc" : "number of records processed in the last job execution"
+              } ]
+            },
+            "doc" : "Statistics from the job execution. The values may be updated during the run of a job."
+          }, {
+            "name" : "jobState",
+            "type" : {
+              "type" : "record",
+              "name" : "JobState",
+              "doc" : "Job state that is updated at the end of an execution",
+              "fields" : [ {
+                "name" : "lowWatermark",
+                "type" : "string",
+                "doc" : "Low watermark after last execution"
+              }, {
+                "name" : "highWatermark",
+                "type" : "string",
+                "doc" : "High watermark after last execution"
+              } ]
+            },
+            "doc" : "Job state that is updated only at the start and end of a job execution."
+          } ]
+        }
       },
       "doc" : "Status of jobs belonging to the flow"
     } ]
-  }, {
-    "type" : "record",
-    "name" : "EmptyRecord",
-    "namespace" : "com.linkedin.restli.common",
-    "doc" : "An literally empty record.  Intended as a marker to indicate the absence of content where a record type is required.  If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this.  For example,  CreateRequest extends Request<EmptyRecord> to indicate it has no response body.   Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
-    "fields" : [ ],
-    "validate" : {
-      "com.linkedin.restli.common.EmptyRecordValidator" : { }
-    }
-  } ],
+  }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.FlowStatistics", "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus" ],
   "schema" : {
     "name" : "flowexecutions",
     "namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index e3b171f..034fca0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -1,36 +1,13 @@
 {
   "models" : [ {
     "type" : "record",
-    "name" : "FlowStatusId",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Identifier for a specific execution of a flow",
-    "fields" : [ {
-      "name" : "flowName",
-      "type" : "string",
-      "doc" : "Name of the flow"
-    }, {
-      "name" : "flowGroup",
-      "type" : "string",
-      "doc" : "Group of the flow. This defines the namespace for the flow."
-    }, {
-      "name" : "flowExecutionId",
-      "type" : "long",
-      "doc" : "Execution id for the flow"
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "FlowStatistics",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Statistics from a flow execution",
-    "fields" : [ {
-      "name" : "executionStartTime",
-      "type" : "long",
-      "doc" : "Epoch time of when the execution began"
-    }, {
-      "name" : "executionEndTime",
-      "type" : "long",
-      "doc" : "Epoch time of when the execution ended"
-    } ]
+    "name" : "EmptyRecord",
+    "namespace" : "com.linkedin.restli.common",
+    "doc" : "An literally empty record.  Intended as a marker to indicate the absence of content where a record type is required.  If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this.  For example,  CreateRequest extends Request<EmptyRecord> to indicate it has no response body.   Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
+    "fields" : [ ],
+    "validate" : {
+      "com.linkedin.restli.common.EmptyRecordValidator" : { }
+    }
   }, {
     "type" : "enum",
     "name" : "ExecutionStatus",
@@ -38,14 +15,14 @@
     "doc" : "Execution status for a flow or job",
     "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
     "symbolDocs" : {
+      "CANCELLED" : "Flow cancelled.",
       "COMPILED" : "Flow compiled to jobs.",
-      "PENDING" : "Flow or job is in pending state.",
-      "PENDING_RETRY" : "Flow or job is pending retry.",
-      "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
-      "RUNNING" : "Flow or job is currently executing",
       "COMPLETE" : "Flow or job completed execution",
       "FAILED" : "Flow or job failed",
-      "CANCELLED" : "Flow cancelled."
+      "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
+      "PENDING" : "Flow or job is in pending state.",
+      "PENDING_RETRY" : "Flow or job is pending retry.",
+      "RUNNING" : "Flow or job is currently executing"
     }
   }, {
     "type" : "record",
@@ -58,8 +35,8 @@
       "doc" : "Name of the flow",
       "validate" : {
         "strlen" : {
-          "min" : 1,
-          "max" : 128
+          "max" : 128,
+          "min" : 1
         }
       }
     }, {
@@ -68,42 +45,16 @@
       "doc" : "Group of the flow. This defines the namespace for the flow.",
       "validate" : {
         "strlen" : {
-          "min" : 1,
-          "max" : 128
-        }
-      }
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "JobId",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Identifier for a Gobblin job",
-    "fields" : [ {
-      "name" : "jobName",
-      "type" : "string",
-      "doc" : "Name of the job",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 600
-        }
-      }
-    }, {
-      "name" : "jobGroup",
-      "type" : "string",
-      "doc" : "Group of the job. This defines the namespace for the job.",
-      "validate" : {
-        "strlen" : {
-          "min" : 1,
-          "max" : 128
+          "max" : 128,
+          "min" : 1
         }
       }
     } ]
   }, {
     "type" : "record",
-    "name" : "JobStatistics",
+    "name" : "FlowStatistics",
     "namespace" : "org.apache.gobblin.service",
-    "doc" : "Statistics from a job execution",
+    "doc" : "Statistics from a flow execution",
     "fields" : [ {
       "name" : "executionStartTime",
       "type" : "long",
@@ -112,59 +63,6 @@
       "name" : "executionEndTime",
       "type" : "long",
       "doc" : "Epoch time of when the execution ended"
-    }, {
-      "name" : "processedCount",
-      "type" : "long",
-      "doc" : "number of records processed in the last job execution"
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "JobState",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Job state that is updated at the end of an execution",
-    "fields" : [ {
-      "name" : "lowWatermark",
-      "type" : "string",
-      "doc" : "Low watermark after last execution"
-    }, {
-      "name" : "highWatermark",
-      "type" : "string",
-      "doc" : "High watermark after last execution"
-    } ]
-  }, {
-    "type" : "record",
-    "name" : "JobStatus",
-    "namespace" : "org.apache.gobblin.service",
-    "doc" : "Execution status for a job",
-    "fields" : [ {
-      "name" : "flowId",
-      "type" : "FlowId",
-      "doc" : "Identifier of the flow the job belongs to"
-    }, {
-      "name" : "jobId",
-      "type" : "JobId",
-      "doc" : "Identifier of the job"
-    }, {
-      "name" : "jobTag",
-      "type" : "string",
-      "doc" : "Tag of the job",
-      "optional" : true
-    }, {
-      "name" : "executionStatus",
-      "type" : "ExecutionStatus",
-      "doc" : "Job execution status"
-    }, {
-      "name" : "message",
-      "type" : "string",
-      "doc" : "Error or status message"
-    }, {
-      "name" : "executionStatistics",
-      "type" : "JobStatistics",
-      "doc" : "Statistics from the job execution. The values may be updated during the run of a job."
-    }, {
-      "name" : "jobState",
-      "type" : "JobState",
-      "doc" : "Job state that is updated only at the start and end of a job execution."
     } ]
   }, {
     "type" : "record",
@@ -173,7 +71,24 @@
     "doc" : "Status of a flow",
     "fields" : [ {
       "name" : "id",
-      "type" : "FlowStatusId",
+      "type" : {
+        "type" : "record",
+        "name" : "FlowStatusId",
+        "doc" : "Identifier for a specific execution of a flow",
+        "fields" : [ {
+          "name" : "flowName",
+          "type" : "string",
+          "doc" : "Name of the flow"
+        }, {
+          "name" : "flowGroup",
+          "type" : "string",
+          "doc" : "Group of the flow. This defines the namespace for the flow."
+        }, {
+          "name" : "flowExecutionId",
+          "type" : "long",
+          "doc" : "Execution id for the flow"
+        } ]
+      },
       "doc" : "Flow status identifier"
     }, {
       "name" : "executionStatistics",
@@ -191,21 +106,106 @@
       "name" : "jobStatuses",
       "type" : {
         "type" : "array",
-        "items" : "JobStatus"
+        "items" : {
+          "type" : "record",
+          "name" : "JobStatus",
+          "doc" : "Execution status for a job",
+          "fields" : [ {
+            "name" : "flowId",
+            "type" : "FlowId",
+            "doc" : "Identifier of the flow the job belongs to"
+          }, {
+            "name" : "jobId",
+            "type" : {
+              "type" : "record",
+              "name" : "JobId",
+              "doc" : "Identifier for a Gobblin job",
+              "fields" : [ {
+                "name" : "jobName",
+                "type" : "string",
+                "doc" : "Name of the job",
+                "validate" : {
+                  "strlen" : {
+                    "max" : 600,
+                    "min" : 1
+                  }
+                }
+              }, {
+                "name" : "jobGroup",
+                "type" : "string",
+                "doc" : "Group of the job. This defines the namespace for the job.",
+                "validate" : {
+                  "strlen" : {
+                    "max" : 128,
+                    "min" : 1
+                  }
+                }
+              } ]
+            },
+            "doc" : "Identifier of the job"
+          }, {
+            "name" : "jobTag",
+            "type" : "string",
+            "doc" : "Tag of the job",
+            "optional" : true
+          }, {
+            "name" : "executionStatus",
+            "type" : "ExecutionStatus",
+            "doc" : "Job execution status"
+          }, {
+            "name" : "message",
+            "type" : "string",
+            "doc" : "Error or status message"
+          }, {
+            "name" : "metrics",
+            "type" : "string",
+            "doc" : "metrics information",
+            "optional" : true
+          }, {
+            "name" : "executionStatistics",
+            "type" : {
+              "type" : "record",
+              "name" : "JobStatistics",
+              "doc" : "Statistics from a job execution",
+              "fields" : [ {
+                "name" : "executionStartTime",
+                "type" : "long",
+                "doc" : "Epoch time of when the execution began"
+              }, {
+                "name" : "executionEndTime",
+                "type" : "long",
+                "doc" : "Epoch time of when the execution ended"
+              }, {
+                "name" : "processedCount",
+                "type" : "long",
+                "doc" : "number of records processed in the last job execution"
+              } ]
+            },
+            "doc" : "Statistics from the job execution. The values may be updated during the run of a job."
+          }, {
+            "name" : "jobState",
+            "type" : {
+              "type" : "record",
+              "name" : "JobState",
+              "doc" : "Job state that is updated at the end of an execution",
+              "fields" : [ {
+                "name" : "lowWatermark",
+                "type" : "string",
+                "doc" : "Low watermark after last execution"
+              }, {
+                "name" : "highWatermark",
+                "type" : "string",
+                "doc" : "High watermark after last execution"
+              } ]
+            },
+            "doc" : "Job state that is updated only at the start and end of a job execution."
+          } ]
+        }
       },
       "doc" : "Status of jobs belonging to the flow"
     } ],
     "deprecated" : "Use FlowExecution instead"
-  }, {
-    "type" : "record",
-    "name" : "EmptyRecord",
-    "namespace" : "com.linkedin.restli.common",
-    "doc" : "An literally empty record.  Intended as a marker to indicate the absence of content where a record type is required.  If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this.  For example,  CreateRequest extends Request<EmptyRecord> to indicate it has no response body.   Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
-    "fields" : [ ],
-    "validate" : {
-      "com.linkedin.restli.common.EmptyRecordValidator" : { }
-    }
-  } ],
+  }, "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus" ],
   "schema" : {
     "name" : "flowstatuses",
     "namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowClientUtils.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowClientUtils.java
new file mode 100644
index 0000000..18e78d3
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowClientUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import com.linkedin.r2.RemoteInvocationException;
+import com.linkedin.r2.message.RequestContext;
+import com.linkedin.restli.client.Request;
+import com.linkedin.restli.client.Response;
+import com.linkedin.restli.client.RestClient;
+import com.linkedin.restli.client.RestLiResponseException;
+
+
+/**
+ * Utils to be used by clients
+ */
+public class FlowClientUtils {
+  /**
+   * Send a restli {@link Request} to the server through a {@link RestClient}, but if the request is rejected due to not
+   * being sent to a leader node, get the leader node from the errorDetails and retry the request with that node by setting
+   * the D2-Hint-TargetService attribute.
+   * @param restClient rest client to use to send the request
+   * @param request request to send
+   * @param primaryResource resource part of the request URL (e.g. flowconfigsV2, which can be taken from
+   *        {@link FlowconfigsV2RequestBuilders#getPrimaryResource()}
+   * @return {@link Response} returned from the request
+   * @throws RemoteInvocationException
+   */
+  public static Response<?> sendRequestWithRetry(RestClient restClient, Request<?> request, String primaryResource) throws RemoteInvocationException {
+    Response<?> response;
+    try {
+      response = restClient.sendRequest(request).getResponse();
+    } catch (RestLiResponseException exception) {
+      if (exception.hasErrorDetails() && exception.getErrorDetails().containsKey(ServiceConfigKeys.LEADER_URL)) {
+        String leaderUrl = exception.getErrorDetails().getString(ServiceConfigKeys.LEADER_URL);
+        RequestContext requestContext = new RequestContext();
+        try {
+          requestContext.putLocalAttr("D2-Hint-TargetService", new URI(leaderUrl + "/" + primaryResource));
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Could not build URI for for url " + leaderUrl, e);
+        }
+        response = restClient.sendRequest(request, requestContext).getResponse();
+      } else {
+        throw exception;
+      }
+    }
+    return response;
+  }
+}
\ No newline at end of file
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 27dd79a..64269d4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -115,10 +115,9 @@ public class FlowConfigV2Client implements Closeable {
 
     CreateIdEntityRequest<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> request =
         _flowconfigsV2RequestBuilders.createAndGet().input(flowConfig).build();
-    ResponseFuture<IdEntityResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig>> flowConfigResponseFuture =
-        _restClient.get().sendRequest(request);
+    Response<?> response = FlowClientUtils.sendRequestWithRetry(_restClient.get(), request, FlowconfigsV2RequestBuilders.getPrimaryResource());
 
-    return createFlowStatusId(flowConfigResponseFuture.getResponse().getLocation().toString());
+    return createFlowStatusId(response.getLocation().toString());
   }
 
   private FlowStatusId createFlowStatusId(String locationHeader) {
@@ -156,9 +155,7 @@ public class FlowConfigV2Client implements Closeable {
         _flowconfigsV2RequestBuilders.update().id(new ComplexResourceKey<>(flowId, new FlowStatusId()))
             .input(flowConfig).build();
 
-    ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(updateRequest);
-
-    response.getResponse();
+    FlowClientUtils.sendRequestWithRetry(_restClient.get(), updateRequest, FlowconfigsV2RequestBuilders.getPrimaryResource());
   }
 
   /**
@@ -175,9 +172,7 @@ public class FlowConfigV2Client implements Closeable {
         _flowconfigsV2RequestBuilders.partialUpdate().id(new ComplexResourceKey<>(flowId, new FlowStatusId()))
             .input(flowConfigPatch).build();
 
-    ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(partialUpdateRequest);
-
-    response.getResponse();
+    FlowClientUtils.sendRequestWithRetry(_restClient.get(), partialUpdateRequest, FlowconfigsV2RequestBuilders.getPrimaryResource());
   }
 
   /**
@@ -240,9 +235,8 @@ public class FlowConfigV2Client implements Closeable {
 
     DeleteRequest<FlowConfig> deleteRequest = _flowconfigsV2RequestBuilders.delete()
         .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
-    ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);
 
-    response.getResponse();
+    FlowClientUtils.sendRequestWithRetry(_restClient.get(), deleteRequest, FlowconfigsV2RequestBuilders.getPrimaryResource());
   }
 
   /**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index c78e26a..ffae759 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service;
 
+import com.linkedin.restli.client.DeleteRequest;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
@@ -160,6 +161,21 @@ public class FlowExecutionClient implements Closeable {
     }
   }
 
+  /**
+   * Kill the flow with given FlowStatusId
+   * @param flowStatusId identifier of flow execution to kill
+   * @throws RemoteInvocationException
+   */
+  public void deleteFlowExecution(FlowStatusId flowStatusId)
+      throws RemoteInvocationException {
+    LOG.debug("deleteFlowExecution with groupName " + flowStatusId.getFlowGroup() + " flowName " +
+        flowStatusId.getFlowName() + " flowExecutionId " + flowStatusId.getFlowExecutionId());
+
+    DeleteRequest<FlowExecution> deleteRequest = _flowexecutionsRequestBuilders.delete()
+        .id(new ComplexResourceKey<>(flowStatusId, new EmptyRecord())).build();
+
+    FlowClientUtils.sendRequestWithRetry(_restClient.get(), deleteRequest, FlowexecutionsRequestBuilders.getPrimaryResource());
+  }
 
   @Override
   public void close()
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 5a738c6..54922f4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -17,22 +17,12 @@
 
 package org.apache.gobblin.service;
 
-import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Strings;
 import com.google.inject.Inject;
-import com.linkedin.data.template.SetMode;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.server.PagingContext;
-import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.Context;
 import com.linkedin.restli.server.annotations.Finder;
@@ -41,21 +31,16 @@ import com.linkedin.restli.server.annotations.QueryParam;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 
-import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.service.monitoring.JobStatusRetriever;
-
 
 /**
  * Resource for handling flow execution requests
  */
 @RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
 public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {
-  private static final Logger LOG = LoggerFactory.getLogger(FlowExecutionResource.class);
-  public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
-  public static final String MESSAGE_SEPARATOR = ", ";
+  public static final String FLOW_EXECUTION_GENERATOR_INJECT_NAME = "FlowExecutionResourceHandler";
 
-  @Inject @javax.inject.Inject @javax.inject.Named(FLOW_STATUS_GENERATOR_INJECT_NAME)
-  FlowStatusGenerator _flowStatusGenerator;
+  @Inject @javax.inject.Inject @javax.inject.Named(FLOW_EXECUTION_GENERATOR_INJECT_NAME)
+  FlowExecutionResourceHandler flowExecutionResourceHandler;
 
   public FlowExecutionResource() {}
 
@@ -66,25 +51,13 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
    */
   @Override
   public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
-    FlowExecution flowExecution = convertFlowStatus(getFlowStatusFromGenerator(key, this._flowStatusGenerator));
-    if (flowExecution == null) {
-      throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowStatusId " + key.getKey()
-      + ". The flowStatusId may be incorrect, or the flow execution may have been cleaned up.");
-    }
-    return flowExecution;
+    return this.flowExecutionResourceHandler.get(key);
   }
 
   @Finder("latestFlowExecution")
   public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context, @QueryParam("flowId") FlowId flowId,
       @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String executionStatus) {
-    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this._flowStatusGenerator);
-
-    if (flowStatuses != null) {
-      return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
-    }
-
-    throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowId " + flowId
-        + ". The flowId may be incorrect, or the flow execution may have been cleaned up.");
+    return this.flowExecutionResourceHandler.getLatestFlowExecution(context, flowId, count, tag, executionStatus);
   }
 
   /**
@@ -94,107 +67,7 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
    */
   @Override
   public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
-    String flowGroup = key.getKey().getFlowGroup();
-    String flowName = key.getKey().getFlowName();
-    Long flowExecutionId = key.getKey().getFlowExecutionId();
-    _flowStatusGenerator.killFlow(flowGroup, flowName, flowExecutionId);
-    return new UpdateResponse(HttpStatus.S_200_OK);
-  }
-
-  public static org.apache.gobblin.service.monitoring.FlowStatus getFlowStatusFromGenerator(ComplexResourceKey<FlowStatusId, EmptyRecord> key,
-      FlowStatusGenerator flowStatusGenerator) {
-    String flowGroup = key.getKey().getFlowGroup();
-    String flowName = key.getKey().getFlowName();
-    long flowExecutionId = key.getKey().getFlowExecutionId();
-
-    LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
-
-    return flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
-  }
-
-  public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
-      Integer count, String tag, String executionStatus, FlowStatusGenerator flowStatusGenerator) {
-    if (count == null) {
-      count = 1;
-    }
-    LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
-
-    return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus);
-  }
-
-  /**
-   * Forms a {@link FlowExecution} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
-   * @param monitoringFlowStatus
-   * @return a {@link FlowExecution} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
-   */
-  public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
-    if (monitoringFlowStatus == null) {
-      return null;
-    }
-
-    Iterator<org.apache.gobblin.service.monitoring.JobStatus> jobStatusIter = monitoringFlowStatus.getJobStatusIterator();
-    JobStatusArray jobStatusArray = new JobStatusArray();
-    FlowId flowId = new FlowId().setFlowName(monitoringFlowStatus.getFlowName())
-        .setFlowGroup(monitoringFlowStatus.getFlowGroup());
-
-    long flowEndTime = 0L;
-    ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
-
-    String flowMessage = "";
-
-    while (jobStatusIter.hasNext()) {
-      org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = jobStatusIter.next();
-
-      // Check if this is the flow status instead of a single job status
-      if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
-        flowEndTime = queriedJobStatus.getEndTime();
-        flowExecutionStatus = ExecutionStatus.valueOf(queriedJobStatus.getEventName());
-        if (queriedJobStatus.getMessage() != null) {
-          flowMessage = queriedJobStatus.getMessage();
-        }
-        continue;
-      }
-
-      JobStatus jobStatus = new JobStatus();
-
-      jobStatus.setFlowId(flowId)
-          .setJobId(new JobId().setJobName(queriedJobStatus.getJobName())
-              .setJobGroup(queriedJobStatus.getJobGroup()))
-          .setJobTag(queriedJobStatus.getJobTag(), SetMode.IGNORE_NULL)
-          .setExecutionStatistics(new JobStatistics()
-              .setExecutionStartTime(queriedJobStatus.getStartTime())
-              .setExecutionEndTime(queriedJobStatus.getEndTime())
-              .setProcessedCount(queriedJobStatus.getProcessedCount()))
-          .setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
-          .setMessage(queriedJobStatus.getMessage())
-          .setJobState(new JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
-              setHighWatermark(queriedJobStatus.getHighWatermark()));
-
-      if (!Strings.isNullOrEmpty(queriedJobStatus.getMetrics())) {
-        jobStatus.setMetrics(queriedJobStatus.getMetrics());
-      }
-
-      jobStatusArray.add(jobStatus);
-    }
-
-    jobStatusArray.sort(Comparator.comparing((JobStatus js) -> js.getExecutionStatistics().getExecutionStartTime()));
-
-    return new FlowExecution()
-        .setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
-            .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
-        .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
-            .setExecutionEndTime(flowEndTime))
-        .setMessage(flowMessage)
-        .setExecutionStatus(flowExecutionStatus)
-        .setJobStatuses(jobStatusArray);
-  }
-
-  /**
-   * Return the flow start time given a {@link org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
-   * assumed to be the flow start time.
-   */
-  private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
-    return flowStatus.getFlowExecutionId();
+    return this.flowExecutionResourceHandler.delete(key);
   }
 }
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
new file mode 100644
index 0000000..918f5ac
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.Context;
+import com.linkedin.restli.server.annotations.Optional;
+import com.linkedin.restli.server.annotations.QueryParam;
+
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+
+
+public interface FlowExecutionResourceHandler {
+  /**
+   * Get {@link FlowExecution}
+   */
+  public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key);
+
+  /**
+   * Get latest {@link FlowExecution}
+   */
+  public List<FlowExecution> getLatestFlowExecution(PagingContext context, FlowId flowId, Integer count, String tag, String executionStatus);
+
+  /**
+   * Kill a running {@link FlowExecution}
+   */
+  public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key);
+}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
similarity index 73%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index 5a738c6..318289d 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -14,91 +14,76 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.service;
-
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringEscapeUtils;
 
 import com.google.common.base.Strings;
-import com.google.inject.Inject;
 import com.linkedin.data.template.SetMode;
+import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.PagingContext;
 import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.Context;
-import com.linkedin.restli.server.annotations.Finder;
 import com.linkedin.restli.server.annotations.Optional;
 import com.linkedin.restli.server.annotations.QueryParam;
-import com.linkedin.restli.server.annotations.RestLiCollection;
-import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.monitoring.FlowStatus;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
 
 
-/**
- * Resource for handling flow execution requests
- */
-@RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
-public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {
-  private static final Logger LOG = LoggerFactory.getLogger(FlowExecutionResource.class);
-  public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
-  public static final String MESSAGE_SEPARATOR = ", ";
+@Slf4j
+public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceHandler {
 
-  @Inject @javax.inject.Inject @javax.inject.Named(FLOW_STATUS_GENERATOR_INJECT_NAME)
-  FlowStatusGenerator _flowStatusGenerator;
+  private final FlowStatusGenerator flowStatusGenerator;
 
-  public FlowExecutionResource() {}
+  public FlowExecutionResourceLocalHandler(FlowStatusGenerator flowStatusGenerator) {
+    this.flowStatusGenerator = flowStatusGenerator;
+  }
 
-  /**
-   * Retrieve the FlowExecution with the given key
-   * @param key {@link FlowStatusId} of flow to get
-   * @return corresponding {@link FlowExecution}
-   */
   @Override
   public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
-    FlowExecution flowExecution = convertFlowStatus(getFlowStatusFromGenerator(key, this._flowStatusGenerator));
+    FlowExecution flowExecution = convertFlowStatus(getFlowStatusFromGenerator(key, this.flowStatusGenerator));
     if (flowExecution == null) {
       throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowStatusId " + key.getKey()
-      + ". The flowStatusId may be incorrect, or the flow execution may have been cleaned up.");
+          + ". The flowStatusId may be incorrect, or the flow execution may have been cleaned up.");
     }
     return flowExecution;
   }
 
-  @Finder("latestFlowExecution")
-  public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context, @QueryParam("flowId") FlowId flowId,
-      @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String executionStatus) {
-    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this._flowStatusGenerator);
+  @Override
+  public List<FlowExecution> getLatestFlowExecution(PagingContext context, FlowId flowId, Integer count, String tag, String executionStatus) {
+    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this.flowStatusGenerator);
 
     if (flowStatuses != null) {
-      return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
+      return flowStatuses.stream().map(FlowExecutionResourceLocalHandler::convertFlowStatus).collect(Collectors.toList());
     }
 
     throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowId " + flowId
         + ". The flowId may be incorrect, or the flow execution may have been cleaned up.");
   }
 
-  /**
-   * Kill the FlowExecution with the given key
-   * @param key {@link FlowStatusId} of flow to kill
-   * @return {@link UpdateResponse}
-   */
   @Override
   public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
-    String flowGroup = key.getKey().getFlowGroup();
-    String flowName = key.getKey().getFlowName();
-    Long flowExecutionId = key.getKey().getFlowExecutionId();
-    _flowStatusGenerator.killFlow(flowGroup, flowName, flowExecutionId);
-    return new UpdateResponse(HttpStatus.S_200_OK);
+    throw new UnsupportedOperationException("Delete should be handled in GobblinServiceFlowConfigResourceHandler");
   }
 
   public static org.apache.gobblin.service.monitoring.FlowStatus getFlowStatusFromGenerator(ComplexResourceKey<FlowStatusId, EmptyRecord> key,
@@ -107,17 +92,17 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
     String flowName = key.getKey().getFlowName();
     long flowExecutionId = key.getKey().getFlowExecutionId();
 
-    LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
+    log.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
 
     return flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
   }
 
-  public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
+  public static List<FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
       Integer count, String tag, String executionStatus, FlowStatusGenerator flowStatusGenerator) {
     if (count == null) {
       count = 1;
     }
-    LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
+    log.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
 
     return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus);
   }
@@ -197,4 +182,3 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
     return flowStatus.getFlowExecutionId();
   }
 }
-
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 4c8c623..2940d10 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -58,13 +58,14 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
   @Override
   public FlowStatus get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
     // this returns null to raise a 404 error if flowStatus is null
-    return convertFlowStatus(FlowExecutionResource.getFlowStatusFromGenerator(key, this._flowStatusGenerator));
+    return convertFlowStatus(FlowExecutionResourceLocalHandler.getFlowStatusFromGenerator(key, this._flowStatusGenerator));
   }
 
   @Finder("latestFlowStatus")
   public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
       @QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
-    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, null, this._flowStatusGenerator);
+    List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResourceLocalHandler
+        .getLatestFlowStatusesFromGenerator(flowId, count, tag, null, this._flowStatusGenerator);
 
     if (flowStatuses != null) {
       return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
@@ -81,7 +82,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
    * @return a {@link org.apache.gobblin.service.FlowStatus} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
    */
   private FlowStatus convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
-    FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(monitoringFlowStatus);
+    FlowExecution flowExecution = FlowExecutionResourceLocalHandler.convertFlowStatus(monitoringFlowStatus);
     return new FlowStatus()
         .setId(flowExecution.getId())
         .setExecutionStatistics(flowExecution.getExecutionStatistics())
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 785c351..7546e12 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -24,7 +24,6 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.eventbus.EventBus;
 
 import lombok.Builder;
 
@@ -41,7 +40,6 @@ public class FlowStatusGenerator {
   public static final int MAX_LOOKBACK = 100;
 
   private final JobStatusRetriever jobStatusRetriever;
-  private final EventBus eventBus;
 
   /**
    * Get the flow statuses of last <code>count</code> (or fewer) executions
@@ -171,11 +169,4 @@ public class FlowStatusGenerator {
     String status = jobStatus.getEventName().toUpperCase();
     return !FINISHED_STATUSES.contains(status);
   }
-
-  /**
-   * Send kill request for the given flow
-   */
-  public void killFlow(String flowGroup, String flowName, Long flowExecutionId) {
-    this.eventBus.post(new KillFlowEvent(flowGroup, flowName, flowExecutionId));
-  }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 60781f9..0643b6a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -56,7 +56,6 @@ import com.google.inject.Module;
 import com.google.inject.name.Names;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.r2.RemoteInvocationException;
-import com.linkedin.restli.server.resources.BaseResource;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -78,6 +77,9 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.service.FlowExecutionResource;
+import org.apache.gobblin.service.FlowExecutionResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
@@ -94,6 +96,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
@@ -148,6 +151,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
   protected GobblinServiceFlowConfigResourceHandler resourceHandler;
   @Getter
   protected GobblinServiceFlowConfigResourceHandler v2ResourceHandler;
+  @Getter
+  protected GobblinServiceFlowExecutionResourceHandler flowExecutionResourceHandler;
+  @Getter
+  protected FlowStatusGenerator flowStatusGenerator;
 
   protected boolean flowCatalogLocalCommit;
   @Getter
@@ -256,11 +263,15 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       this.serviceLauncher.addService(this.jobStatusMonitor);
     }
 
+    this.flowStatusGenerator = buildFlowStatusGenerator(this.config);
+
     // Initialize ServiceScheduler
     this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
         ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
     if (isSchedulerEnabled) {
       this.orchestrator = new Orchestrator(config, Optional.of(this.topologyCatalog), Optional.fromNullable(this.dagManager), Optional.of(LOGGER));
+      this.orchestrator.setFlowStatusGenerator(this.flowStatusGenerator);
+
       SchedulerService schedulerService = new SchedulerService(ConfigUtils.configToProperties(config));
 
       this.scheduler = new GobblinServiceJobScheduler(this.serviceName, config, this.helixManager,
@@ -271,17 +282,24 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     }
 
     // Initialize RestLI
+    boolean forceLeader = ConfigUtils.getBoolean(this.config, ServiceConfigKeys.FORCE_LEADER, ServiceConfigKeys.DEFAULT_FORCE_LEADER);
+
     this.resourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
         this.flowCatalogLocalCommit,
         new FlowConfigResourceLocalHandler(this.flowCatalog),
         this.helixManager,
-        this.scheduler);
+        this.scheduler,
+        forceLeader);
 
     this.v2ResourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
         this.flowCatalogLocalCommit,
         new FlowConfigV2ResourceLocalHandler(this.flowCatalog),
         this.helixManager,
-        this.scheduler);
+        this.scheduler,
+        forceLeader);
+
+    this.flowExecutionResourceHandler = new GobblinServiceFlowExecutionResourceHandler(new FlowExecutionResourceLocalHandler(this.flowStatusGenerator),
+        this.eventBus, this.helixManager, forceLeader);
 
     this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
         ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
@@ -296,6 +314,9 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
           binder.bind(FlowConfigsResourceHandler.class)
               .annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME))
               .toInstance(GobblinServiceManager.this.v2ResourceHandler);
+          binder.bind(FlowExecutionResourceHandler.class)
+              .annotatedWith(Names.named(FlowExecutionResource.FLOW_EXECUTION_GENERATOR_INJECT_NAME))
+              .toInstance(GobblinServiceManager.this.flowExecutionResourceHandler);
           binder.bindConstant()
               .annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE))
               .to(Boolean.TRUE);
@@ -359,8 +380,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
    */
   private HelixManager buildHelixManager(Config config, String zkConnectionString) {
     String helixClusterName = config.getString(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY);
-    String helixInstanceName = ConfigUtils.getString(config, ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY,
-        GobblinServiceManager.class.getSimpleName());
+    String helixInstanceName = HelixUtils.buildHelixInstanceName(config, GobblinServiceManager.class.getSimpleName());
 
     LOGGER.info("Creating Helix cluster if not already present [overwrite = false]: " + zkConnectionString);
     HelixUtils.createGobblinHelixCluster(zkConnectionString, helixClusterName, false);
@@ -639,7 +659,6 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       try (GobblinServiceManager gobblinServiceManager = new GobblinServiceManager(
           cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(cmd),
           config, Optional.<Path>absent())) {
-        gobblinServiceManager.getOrchestrator().setFlowStatusGenerator(gobblinServiceManager.buildFlowStatusGenerator(config));
         gobblinServiceManager.start();
 
         if (isTestMode) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 0d862d1..12ce857 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -63,16 +63,19 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
   private FlowConfigResourceLocalHandler localHandler;
   private Optional<HelixManager> helixManager;
   private GobblinServiceJobScheduler jobScheduler;
+  private boolean forceLeader;
 
   public GobblinServiceFlowConfigResourceHandler(String serviceName, boolean flowCatalogLocalCommit,
       FlowConfigResourceLocalHandler handler,
       Optional<HelixManager> manager,
-      GobblinServiceJobScheduler jobScheduler) {
+      GobblinServiceJobScheduler jobScheduler,
+      boolean forceLeader) {
     this.flowCatalogLocalCommit = flowCatalogLocalCommit;
     this.serviceName = serviceName;
     this.localHandler = handler;
     this.helixManager = manager;
     this.jobScheduler = jobScheduler;
+    this.forceLeader = forceLeader;
   }
 
   @Override
@@ -117,6 +120,10 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
 
     checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, flowName, flowGroup);
 
+    if (forceLeader) {
+      HelixUtils.throwErrorIfNotLeader(helixManager);
+    }
+
     try {
       if (!jobScheduler.isActive() && helixManager.isPresent()) {
         CreateResponse response = null;
@@ -167,6 +174,10 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
 
     checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE, flowName, flowGroup);
 
+    if (forceLeader) {
+      HelixUtils.throwErrorIfNotLeader(helixManager);
+    }
+
     try {
       if (!jobScheduler.isActive() && helixManager.isPresent()) {
 
@@ -223,6 +234,10 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
 
     checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, flowName, flowGroup);
 
+    if (forceLeader) {
+      HelixUtils.throwErrorIfNotLeader(helixManager);
+    }
+
     try {
       if (!jobScheduler.isActive() && helixManager.isPresent()) {
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
new file mode 100644
index 0000000..2a94fb0
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.UpdateResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.FlowExecution;
+import org.apache.gobblin.service.FlowExecutionResourceHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.FlowStatusId;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+
+
+/**
+ * {@link FlowExecutionResourceHandler} that calls underlying resource handler, but does extra work that requires objects
+ * like the {@link HelixManager}. For now, that is just checking leadership and sending the kill through the eventBus
+ * for the delete method.
+ */
+@Slf4j
+public class GobblinServiceFlowExecutionResourceHandler implements FlowExecutionResourceHandler {
+  private FlowExecutionResourceHandler localHandler;
+  private EventBus eventBus;
+  private Optional<HelixManager> helixManager;
+  private boolean forceLeader;
+
+  public GobblinServiceFlowExecutionResourceHandler(FlowExecutionResourceHandler handler, EventBus eventBus,
+      Optional<HelixManager> manager, boolean forceLeader) {
+    this.localHandler = handler;
+    this.eventBus = eventBus;
+    this.helixManager = manager;
+    this.forceLeader = forceLeader;
+  }
+
+  @Override
+  public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+    return this.localHandler.get(key);
+  }
+
+  @Override
+  public List<FlowExecution> getLatestFlowExecution(PagingContext context, FlowId flowId, Integer count, String tag, String executionStatus) {
+    return this.localHandler.getLatestFlowExecution(context, flowId, count, tag, executionStatus);
+  }
+
+  @Override
+  public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    Long flowExecutionId = key.getKey().getFlowExecutionId();
+    if (this.forceLeader) {
+      HelixUtils.throwErrorIfNotLeader(this.helixManager);
+    }
+    this.eventBus.post(new KillFlowEvent(flowGroup, flowName, flowExecutionId));
+    return new UpdateResponse(HttpStatus.S_200_OK);
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
index f2c1c84..9c4efbc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
@@ -17,23 +17,43 @@
 
 package org.apache.gobblin.service.modules.utils;
 
-import com.google.common.annotations.VisibleForTesting;
-import java.util.UUID;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.net.UnknownHostException;
+
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
+import org.slf4j.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.linkedin.data.DataMap;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.RestLiServiceException;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.slf4j.Logger;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 @Alpha
+@Slf4j
 public class HelixUtils {
+  public static final String HELIX_INSTANCE_NAME_SEPARATOR = "@";
 
   /***
    * Build a Helix Manager (Helix Controller instance).
@@ -107,4 +127,60 @@ public class HelixUtils {
       logger.error(String.format("Failed to send the %s message to the participants", message));
     }
   }
+
+  private static String getUrlFromHelixInstanceName(String helixInstanceName) {
+    if (!helixInstanceName.contains(HELIX_INSTANCE_NAME_SEPARATOR)) {
+      return null;
+    } else {
+      String url = helixInstanceName.substring(helixInstanceName.indexOf(HELIX_INSTANCE_NAME_SEPARATOR) + 1);
+      try {
+        return URLDecoder.decode(url, "UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException("Failed to decode URL from helix instance name", e);
+      }
+    }
+  }
+
+  private static String getLeaderUrl(HelixManager helixManager) {
+    PropertyKey key = helixManager.getHelixDataAccessor().keyBuilder().controllerLeader();
+    LiveInstance leader = helixManager.getHelixDataAccessor().getProperty(key);
+    return getUrlFromHelixInstanceName(leader.getInstanceName());
+  }
+
+  /**
+   * If this host is not the leader, throw a {@link RestLiServiceException}, and include the URL of the leader host in
+   * the message and in the errorDetails under the key {@link ServiceConfigKeys#LEADER_URL}.
+   */
+  public static void throwErrorIfNotLeader(Optional<HelixManager> helixManager)  {
+    if (helixManager.isPresent() && !helixManager.get().isLeader()) {
+      String leaderUrl = getLeaderUrl(helixManager.get());
+      if (leaderUrl == null) {
+        throw new RuntimeException("Request sent to slave node but could not get leader node URL");
+      }
+      RestLiServiceException exception = new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "Request must be sent to leader node at URL " + leaderUrl);
+      exception.setErrorDetails(new DataMap(ImmutableMap.of(ServiceConfigKeys.LEADER_URL, leaderUrl)));
+      throw exception;
+    }
+  }
+
+  /**
+   * Build helix instance name by getting {@link org.apache.gobblin.service.ServiceConfigKeys#HELIX_INSTANCE_NAME_KEY}
+   * and appending the host, port, and service name with a separator
+   */
+  public static String buildHelixInstanceName(Config config, String defaultInstanceName) {
+    String helixInstanceName = ConfigUtils
+        .getString(config, ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY, defaultInstanceName);
+
+    String url = "";
+    try {
+      url = ConfigUtils.getString(config, ServiceConfigKeys.SERVICE_URL_PREFIX, "https://")
+          + InetAddress.getLocalHost().getHostName() + ":" + ConfigUtils.getString(config, ServiceConfigKeys.SERVICE_PORT, "")
+          + "/" + ConfigUtils.getString(config, ServiceConfigKeys.SERVICE_NAME, "");
+      url = HELIX_INSTANCE_NAME_SEPARATOR + URLEncoder.encode(url, "UTF-8");
+    } catch (UnknownHostException | UnsupportedEncodingException e) {
+      log.warn("Failed to construct helix instance name", e);
+    }
+
+    return helixInstanceName + url;
+  }
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
new file mode 100644
index 0000000..ce4cf71
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.linkedin.data.template.StringMap;
+import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.RestLiResponseException;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigClient;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PortUtils;
+
+
+@Test
+public class GobblinServiceRedirectTest {
+
+  private static final Logger logger = LoggerFactory.getLogger(GobblinServiceRedirectTest.class);
+
+  private static final String QUARTZ_INSTANCE_NAME = "org.quartz.scheduler.instanceName";
+  private static final String QUARTZ_THREAD_POOL_COUNT = "org.quartz.threadPool.threadCount";
+
+  private static final File BASE_PATH1 = Files.createTempDir();
+  private static final String NODE_1_SERVICE_WORK_DIR = new Path(BASE_PATH1.getAbsolutePath(), "serviceWorkDirNode1").toString();
+  private static final String NODE_1_TOPOLOGY_SPEC_STORE_DIR = new Path(BASE_PATH1.getAbsolutePath(), "topologyTestSpecStoreNode1").toString();
+  private static final String NODE_1_FLOW_SPEC_STORE_DIR = new Path(BASE_PATH1.getAbsolutePath(), "flowTestSpecStore").toString();
+  private static final String NODE_1_JOB_STATUS_STATE_STORE_DIR = new Path(BASE_PATH1.getAbsolutePath(), "fsJobStatusRetriever").toString();
+
+  private static final File BASE_PATH2 = Files.createTempDir();
+  private static final String NODE_2_SERVICE_WORK_DIR = new Path(BASE_PATH2.getAbsolutePath(), "serviceWorkDirNode2").toString();
+  private static final String NODE_2_TOPOLOGY_SPEC_STORE_DIR = new Path(BASE_PATH2.getAbsolutePath(), "topologyTestSpecStoreNode2").toString();
+  private static final String NODE_2_FLOW_SPEC_STORE_DIR = new Path(BASE_PATH2.getAbsolutePath(), "flowTestSpecStore").toString();
+  private static final String NODE_2_JOB_STATUS_STATE_STORE_DIR = new Path(BASE_PATH2.getAbsolutePath(), "fsJobStatusRetriever").toString();
+
+  private static final String TEST_HELIX_CLUSTER_NAME = "testRedirectGobblinServiceCluster";
+
+  private static final String TEST_GROUP_NAME_1 = "testRedirectGroup1";
+  private static final String TEST_FLOW_NAME_1 = "testRedirectFlow1";
+  private static final String TEST_SCHEDULE_1 = "0 1/0 * ? * *";
+  private static final String TEST_TEMPLATE_URI_1 = "FS:///templates/test.template";
+
+  private static final String TEST_GROUP_NAME_2 = "testRedirectGroup2";
+  private static final String TEST_FLOW_NAME_2 = "testRedirectFlow2";
+  private static final String TEST_SCHEDULE_2 = "0 1/0 * ? * *";
+  private static final String TEST_TEMPLATE_URI_2 = "FS:///templates/test.template";
+
+  private static final String TEST_GOBBLIN_EXECUTOR_NAME = "testRedirectGobblinExecutor";
+  private static final String TEST_SOURCE_NAME = "testSource";
+  private static final String TEST_SINK_NAME = "testSink";
+
+  private String port1 = "10000";
+  private String port2 = "20000";
+
+  private static final String PREFIX = "https://";
+  private static final String SERVICE_NAME = "gobblinServiceTest";
+
+  private GobblinServiceManager node1GobblinServiceManager;
+  private FlowConfigClient node1FlowConfigClient;
+
+  private GobblinServiceManager node2GobblinServiceManager;
+  private FlowConfigClient node2FlowConfigClient;
+
+  private TestingServer testingZKServer;
+
+  private Properties node1ServiceCoreProperties;
+  private Properties node2ServiceCoreProperties;
+
+  @BeforeClass
+  public void setup() throws Exception {
+    port1 = Integer.toString(new PortUtils.ServerSocketPortLocator().random());
+    port2 = Integer.toString(new PortUtils.ServerSocketPortLocator().random());
+
+    BASE_PATH1.deleteOnExit();
+    BASE_PATH2.deleteOnExit();
+
+    // Use a random ZK port
+    this.testingZKServer = new TestingServer(-1);
+    logger.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
+    HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(), TEST_HELIX_CLUSTER_NAME);
+
+    ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Properties commonServiceCoreProperties = new Properties();
+    commonServiceCoreProperties.put(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, testingZKServer.getConnectString());
+    commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY, TEST_HELIX_CLUSTER_NAME);
+    commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY, "GaaS_" + UUID.randomUUID().toString());
+    commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY , TEST_GOBBLIN_EXECUTOR_NAME);
+    commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".description",
+        "StandaloneTestExecutor");
+    commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".version",
+        "1");
+    commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".uri",
+        "gobblinExecutor");
+    commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance",
+        "org.gobblin.service.InMemorySpecExecutor");
+    commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities",
+        TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
+    commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
+    commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
+    commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
+    commonServiceCoreProperties.put("zookeeper.connect", testingZKServer.getConnectString());
+    commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
+    commonServiceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
+
+    commonServiceCoreProperties.put(ServiceConfigKeys.FORCE_LEADER, true);
+    commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_URL_PREFIX, PREFIX);
+    commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_NAME, SERVICE_NAME);
+
+    node1ServiceCoreProperties = new Properties();
+    node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
+    node1ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_1_TOPOLOGY_SPEC_STORE_DIR);
+    node1ServiceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, NODE_1_FLOW_SPEC_STORE_DIR);
+    node1ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_1_JOB_STATUS_STATE_STORE_DIR);
+    node1ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "RedirectQuartzScheduler1");
+    node1ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
+    node1ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port1);
+
+    node2ServiceCoreProperties = new Properties();
+    node2ServiceCoreProperties.putAll(commonServiceCoreProperties);
+    node2ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_2_TOPOLOGY_SPEC_STORE_DIR);
+    node2ServiceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, NODE_2_FLOW_SPEC_STORE_DIR);
+    node2ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_2_JOB_STATUS_STATE_STORE_DIR);
+    node2ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "RedirectQuartzScheduler2");
+    node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
+    node2ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port2);
+
+    // Start Node 1
+    this.node1GobblinServiceManager = new GobblinServiceManager("RedirectCoreService1", "1",
+        ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
+    this.node1GobblinServiceManager.start();
+
+    // Start Node 2
+    this.node2GobblinServiceManager = new GobblinServiceManager("RedirectCoreService2", "2",
+        ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
+    this.node2GobblinServiceManager.start();
+
+    // Initialize Node 1 Client
+    Map<String, String> transportClientProperties = Maps.newHashMap();
+    transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, "10000");
+    this.node1FlowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/",
+        this.node1GobblinServiceManager.restliServer.getPort()), transportClientProperties);
+
+    // Initialize Node 2 Client
+    this.node2FlowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/",
+        this.node2GobblinServiceManager.restliServer.getPort()), transportClientProperties);
+  }
+
+  @AfterClass
+  public void cleanUp() throws Exception {
+    // Shutdown Node 1
+    try {
+      logger.info("+++++++++++++++++++ start shutdown noad1");
+      this.node1GobblinServiceManager.stop();
+    } catch (Exception e) {
+      logger.warn("Could not cleanly stop Node 1 of Gobblin Service", e);
+    }
+
+    // Shutdown Node 2
+    try {
+      logger.info("+++++++++++++++++++ start shutdown noad2");
+      this.node2GobblinServiceManager.stop();
+    } catch (Exception e) {
+      logger.warn("Could not cleanly stop Node 2 of Gobblin Service", e);
+    }
+
+    // Stop Zookeeper
+    try {
+      this.testingZKServer.close();
+    } catch (Exception e) {
+      logger.warn("Could not cleanly stop Testing Zookeeper", e);
+    }
+  }
+
+  @Test
+  public void testCreate() throws Exception {
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
+    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
+
+    FlowConfig flowConfig1 = new FlowConfig()
+        .setId(new FlowId().setFlowGroup(TEST_GROUP_NAME_1).setFlowName(TEST_FLOW_NAME_1))
+        .setTemplateUris(TEST_TEMPLATE_URI_1).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE_1).
+            setRunImmediately(true))
+        .setProperties(new StringMap(flowProperties));
+    FlowConfig flowConfig2 = new FlowConfig()
+        .setId(new FlowId().setFlowGroup(TEST_GROUP_NAME_2).setFlowName(TEST_FLOW_NAME_2))
+        .setTemplateUris(TEST_TEMPLATE_URI_2).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE_2).
+            setRunImmediately(true))
+        .setProperties(new StringMap(flowProperties));
+
+    GobblinServiceManager leader;
+    FlowConfigClient leaderClient;
+    FlowConfigClient slaveClient;
+    if (this.node1GobblinServiceManager.isLeader()) {
+      leader = this.node1GobblinServiceManager;
+      leaderClient = this.node1FlowConfigClient;
+      slaveClient = this.node2FlowConfigClient;
+    } else {
+      leader = this.node2GobblinServiceManager;
+      leaderClient = this.node2FlowConfigClient;
+      slaveClient = this.node1FlowConfigClient;
+    }
+
+    // Try create on leader, should be successful
+    leaderClient.createFlowConfig(flowConfig1);
+
+    // Try create on slave, should throw an error with leader URL
+    try {
+      slaveClient.createFlowConfig(flowConfig2);
+    } catch (RestLiResponseException e) {
+      Assert.assertTrue(e.hasErrorDetails());
+      Assert.assertTrue(e.getErrorDetails().containsKey(ServiceConfigKeys.LEADER_URL));
+      String expectedUrl = PREFIX + InetAddress.getLocalHost().getHostName() + ":" + leader.restliServer.getPort() + "/" + SERVICE_NAME;
+      Assert.assertEquals(e.getErrorDetails().get(ServiceConfigKeys.LEADER_URL), expectedUrl);
+      return;
+    }
+
+    throw new RuntimeException("Slave should have thrown an error");
+  }
+}
\ No newline at end of file