You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/11/03 22:14:13 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1285] Add
option to only accept requests to leader node and redirect requests in the
client
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f276724 [GOBBLIN-1285] Add option to only accept requests to leader node and redirect requests in the client
f276724 is described below
commit f2767242fe863307feaf68332f0d3617f30e8e4b
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Nov 3 14:14:06 2020 -0800
[GOBBLIN-1285] Add option to only accept requests to leader node and redirect requests in the client
Closes #3124 from jack-moseley/gaas-redirect
---
.../apache/gobblin/service/ServiceConfigKeys.java | 7 +
gobblin-restli/client.gradle | 3 +
...pache.gobblin.service.flowconfigs.snapshot.json | 106 ++++---
...che.gobblin.service.flowconfigsV2.snapshot.json | 94 +++---
...he.gobblin.service.flowexecutions.snapshot.json | 332 ++++++++++-----------
...ache.gobblin.service.flowstatuses.snapshot.json | 264 ++++++++--------
.../apache/gobblin/service/FlowClientUtils.java | 66 ++++
.../apache/gobblin/service/FlowConfigV2Client.java | 16 +-
.../gobblin/service/FlowExecutionClient.java | 16 +
.../gobblin/service/FlowExecutionResource.java | 139 +--------
.../service/FlowExecutionResourceHandler.java | 52 ++++
...java => FlowExecutionResourceLocalHandler.java} | 74 ++---
.../apache/gobblin/service/FlowStatusResource.java | 7 +-
.../service/monitoring/FlowStatusGenerator.java | 9 -
.../modules/core/GobblinServiceManager.java | 31 +-
.../GobblinServiceFlowConfigResourceHandler.java | 17 +-
...GobblinServiceFlowExecutionResourceHandler.java | 83 ++++++
.../gobblin/service/modules/utils/HelixUtils.java | 82 ++++-
.../modules/core/GobblinServiceRedirectTest.java | 263 ++++++++++++++++
19 files changed, 1049 insertions(+), 612 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index e161b78..6aefffb 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -115,6 +115,8 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties";
// GAAS Listerning Port
public static final String SERVICE_PORT = GOBBLIN_SERVICE_PREFIX + "port";
+ public static final String SERVICE_NAME = GOBBLIN_SERVICE_PREFIX + "serviceName";
+ public static final String SERVICE_URL_PREFIX = GOBBLIN_SERVICE_PREFIX + "serviceUrlPrefix";
// Prefix for config to ServiceBasedAppLauncher that will only be used by GaaS and not orchestrated jobs
public static final String GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX = "gobblinServiceAppLauncher";
@@ -122,4 +124,9 @@ public class ServiceConfigKeys {
//Flow concurrency config key to control default service behavior.
public static final String FLOW_CONCURRENCY_ALLOWED = GOBBLIN_SERVICE_PREFIX + "flowConcurrencyAllowed";
public static final Boolean DEFAULT_FLOW_CONCURRENCY_ALLOWED = true;
+
+ public static final String LEADER_URL = "leaderUrl";
+
+ public static final String FORCE_LEADER = GOBBLIN_SERVICE_PREFIX + "forceLeader";
+ public static final boolean DEFAULT_FORCE_LEADER = false;
}
diff --git a/gobblin-restli/client.gradle b/gobblin-restli/client.gradle
index 215c17d..ca13aeb 100644
--- a/gobblin-restli/client.gradle
+++ b/gobblin-restli/client.gradle
@@ -47,6 +47,9 @@ if (file('extraDependencies.gradle').exists()) {
dependencies {
compile project(path: apiProject.path, configuration: 'dataTemplate')
compile project(path: apiProject.path, configuration: 'restClient')
+ compile(project(':gobblin-api')) {
+ transitive = false
+ }
testCompile project(":gobblin-restli:gobblin-restli-utils")
testCompile externalDependency.testng
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
index 9485f8d..86c9ee4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
@@ -1,48 +1,13 @@
{
"models" : [ {
"type" : "record",
- "name" : "FlowId",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Identifier for a Gobblin as a Service flow",
- "fields" : [ {
- "name" : "flowName",
- "type" : "string",
- "doc" : "Name of the flow",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 128
- }
- }
- }, {
- "name" : "flowGroup",
- "type" : "string",
- "doc" : "Group of the flow. This defines the namespace for the flow.",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 128
- }
- }
- } ]
- }, {
- "type" : "record",
- "name" : "Schedule",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Attributes for defining a job schedule",
- "fields" : [ {
- "name" : "cronSchedule",
- "type" : "string",
- "doc" : "Schedule for flow in cron format",
- "validate" : {
- "org.apache.gobblin.service.validator.CronValidator" : { }
- }
- }, {
- "name" : "runImmediately",
- "type" : "boolean",
- "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
- "default" : false
- } ]
+ "name" : "EmptyRecord",
+ "namespace" : "com.linkedin.restli.common",
+ "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request<EmptyRecord> to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
+ "fields" : [ ],
+ "validate" : {
+ "com.linkedin.restli.common.EmptyRecordValidator" : { }
+ }
}, {
"type" : "record",
"name" : "FlowConfig",
@@ -50,11 +15,53 @@
"doc" : "Defines a flow configuration that can be compiled into Gobblin jobs",
"fields" : [ {
"name" : "id",
- "type" : "FlowId",
+ "type" : {
+ "type" : "record",
+ "name" : "FlowId",
+ "doc" : "Identifier for a Gobblin as a Service flow",
+ "fields" : [ {
+ "name" : "flowName",
+ "type" : "string",
+ "doc" : "Name of the flow",
+ "validate" : {
+ "strlen" : {
+ "max" : 128,
+ "min" : 1
+ }
+ }
+ }, {
+ "name" : "flowGroup",
+ "type" : "string",
+ "doc" : "Group of the flow. This defines the namespace for the flow.",
+ "validate" : {
+ "strlen" : {
+ "max" : 128,
+ "min" : 1
+ }
+ }
+ } ]
+ },
"doc" : "Identifier for the flow"
}, {
"name" : "schedule",
- "type" : "Schedule",
+ "type" : {
+ "type" : "record",
+ "name" : "Schedule",
+ "doc" : "Attributes for defining a job schedule",
+ "fields" : [ {
+ "name" : "cronSchedule",
+ "type" : "string",
+ "doc" : "Schedule for flow in cron format",
+ "validate" : {
+ "org.apache.gobblin.service.validator.CronValidator" : { }
+ }
+ }, {
+ "name" : "runImmediately",
+ "type" : "boolean",
+ "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
+ "default" : false
+ } ]
+ },
"doc" : "Optional schedule for when to execution the flow. If a schedule is not specified then the flow is executed immediately.",
"optional" : true
}, {
@@ -77,16 +84,7 @@
},
"doc" : "Properties for the flow. These properties are passed to the compiled Gobblin jobs."
} ]
- }, {
- "type" : "record",
- "name" : "EmptyRecord",
- "namespace" : "com.linkedin.restli.common",
- "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request<EmptyRecord> to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
- "fields" : [ ],
- "validate" : {
- "com.linkedin.restli.common.EmptyRecordValidator" : { }
- }
- } ],
+ }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.Schedule" ],
"schema" : {
"name" : "flowconfigs",
"namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index fa8e0cd..13da7a5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -1,60 +1,58 @@
{
"models" : [ {
"type" : "record",
- "name" : "FlowId",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Identifier for a Gobblin as a Service flow",
- "fields" : [ {
- "name" : "flowName",
- "type" : "string",
- "doc" : "Name of the flow",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 128
- }
- }
- }, {
- "name" : "flowGroup",
- "type" : "string",
- "doc" : "Group of the flow. This defines the namespace for the flow.",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 128
- }
- }
- } ]
- }, {
- "type" : "record",
- "name" : "Schedule",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Attributes for defining a job schedule",
- "fields" : [ {
- "name" : "cronSchedule",
- "type" : "string",
- "doc" : "Schedule for flow in cron format",
- "validate" : {
- "org.apache.gobblin.service.validator.CronValidator" : { }
- }
- }, {
- "name" : "runImmediately",
- "type" : "boolean",
- "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
- "default" : false
- } ]
- }, {
- "type" : "record",
"name" : "FlowConfig",
"namespace" : "org.apache.gobblin.service",
"doc" : "Defines a flow configuration that can be compiled into Gobblin jobs",
"fields" : [ {
"name" : "id",
- "type" : "FlowId",
+ "type" : {
+ "type" : "record",
+ "name" : "FlowId",
+ "doc" : "Identifier for a Gobblin as a Service flow",
+ "fields" : [ {
+ "name" : "flowName",
+ "type" : "string",
+ "doc" : "Name of the flow",
+ "validate" : {
+ "strlen" : {
+ "max" : 128,
+ "min" : 1
+ }
+ }
+ }, {
+ "name" : "flowGroup",
+ "type" : "string",
+ "doc" : "Group of the flow. This defines the namespace for the flow.",
+ "validate" : {
+ "strlen" : {
+ "max" : 128,
+ "min" : 1
+ }
+ }
+ } ]
+ },
"doc" : "Identifier for the flow"
}, {
"name" : "schedule",
- "type" : "Schedule",
+ "type" : {
+ "type" : "record",
+ "name" : "Schedule",
+ "doc" : "Attributes for defining a job schedule",
+ "fields" : [ {
+ "name" : "cronSchedule",
+ "type" : "string",
+ "doc" : "Schedule for flow in cron format",
+ "validate" : {
+ "org.apache.gobblin.service.validator.CronValidator" : { }
+ }
+ }, {
+ "name" : "runImmediately",
+ "type" : "boolean",
+ "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
+ "default" : false
+ } ]
+ },
"doc" : "Optional schedule for when to execution the flow. If a schedule is not specified then the flow is executed immediately.",
"optional" : true
}, {
@@ -77,7 +75,7 @@
},
"doc" : "Properties for the flow. These properties are passed to the compiled Gobblin jobs."
} ]
- }, {
+ }, "org.apache.gobblin.service.FlowId", {
"type" : "record",
"name" : "FlowStatusId",
"namespace" : "org.apache.gobblin.service",
@@ -95,7 +93,7 @@
"type" : "long",
"doc" : "Execution id for the flow"
} ]
- } ],
+ }, "org.apache.gobblin.service.Schedule" ],
"schema" : {
"name" : "flowconfigsV2",
"namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index ed12a21..65321d6 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -1,36 +1,13 @@
{
"models" : [ {
"type" : "record",
- "name" : "FlowStatusId",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Identifier for a specific execution of a flow",
- "fields" : [ {
- "name" : "flowName",
- "type" : "string",
- "doc" : "Name of the flow"
- }, {
- "name" : "flowGroup",
- "type" : "string",
- "doc" : "Group of the flow. This defines the namespace for the flow."
- }, {
- "name" : "flowExecutionId",
- "type" : "long",
- "doc" : "Execution id for the flow"
- } ]
- }, {
- "type" : "record",
- "name" : "FlowStatistics",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Statistics from a flow execution",
- "fields" : [ {
- "name" : "executionStartTime",
- "type" : "long",
- "doc" : "Epoch time of when the execution began"
- }, {
- "name" : "executionEndTime",
- "type" : "long",
- "doc" : "Epoch time of when the execution ended"
- } ]
+ "name" : "EmptyRecord",
+ "namespace" : "com.linkedin.restli.common",
+ "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request<EmptyRecord> to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
+ "fields" : [ ],
+ "validate" : {
+ "com.linkedin.restli.common.EmptyRecordValidator" : { }
+ }
}, {
"type" : "enum",
"name" : "ExecutionStatus",
@@ -38,146 +15,57 @@
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbolDocs" : {
+ "CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
- "PENDING" : "Flow or job is in pending state.",
- "PENDING_RETRY" : "Flow or job is pending retry.",
- "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
- "RUNNING" : "Flow or job is currently executing",
"COMPLETE" : "Flow or job completed execution",
"FAILED" : "Flow or job failed",
- "CANCELLED" : "Flow cancelled."
+ "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
+ "PENDING" : "Flow or job is in pending state.",
+ "PENDING_RETRY" : "Flow or job is pending retry.",
+ "RUNNING" : "Flow or job is currently executing"
}
}, {
"type" : "record",
- "name" : "FlowId",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Identifier for a Gobblin as a Service flow",
- "fields" : [ {
- "name" : "flowName",
- "type" : "string",
- "doc" : "Name of the flow",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 128
- }
- }
- }, {
- "name" : "flowGroup",
- "type" : "string",
- "doc" : "Group of the flow. This defines the namespace for the flow.",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 128
- }
- }
- } ]
- }, {
- "type" : "record",
- "name" : "JobId",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Identifier for a Gobblin job",
- "fields" : [ {
- "name" : "jobName",
- "type" : "string",
- "doc" : "Name of the job",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 600
- }
- }
- }, {
- "name" : "jobGroup",
- "type" : "string",
- "doc" : "Group of the job. This defines the namespace for the job.",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 128
- }
- }
- } ]
- }, {
- "type" : "record",
- "name" : "JobStatistics",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Statistics from a job execution",
- "fields" : [ {
- "name" : "executionStartTime",
- "type" : "long",
- "doc" : "Epoch time of when the execution began"
- }, {
- "name" : "executionEndTime",
- "type" : "long",
- "doc" : "Epoch time of when the execution ended"
- }, {
- "name" : "processedCount",
- "type" : "long",
- "doc" : "number of records processed in the last job execution"
- } ]
- }, {
- "type" : "record",
- "name" : "JobState",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Job state that is updated at the end of an execution",
- "fields" : [ {
- "name" : "lowWatermark",
- "type" : "string",
- "doc" : "Low watermark after last execution"
- }, {
- "name" : "highWatermark",
- "type" : "string",
- "doc" : "High watermark after last execution"
- } ]
- }, {
- "type" : "record",
- "name" : "JobStatus",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Execution status for a job",
- "fields" : [ {
- "name" : "flowId",
- "type" : "FlowId",
- "doc" : "Identifier of the flow the job belongs to"
- }, {
- "name" : "jobId",
- "type" : "JobId",
- "doc" : "Identifier of the job"
- }, {
- "name" : "jobTag",
- "type" : "string",
- "doc" : "Tag of the job",
- "optional" : true
- }, {
- "name" : "executionStatus",
- "type" : "ExecutionStatus",
- "doc" : "Job execution status"
- }, {
- "name" : "message",
- "type" : "string",
- "doc" : "Error or status message"
- }, {
- "name" : "executionStatistics",
- "type" : "JobStatistics",
- "doc" : "Statistics from the job execution. The values may be updated during the run of a job."
- }, {
- "name" : "jobState",
- "type" : "JobState",
- "doc" : "Job state that is updated only at the start and end of a job execution."
- } ]
- }, {
- "type" : "record",
"name" : "FlowExecution",
"namespace" : "org.apache.gobblin.service",
"doc" : "Represents an execution of a flow",
"fields" : [ {
"name" : "id",
- "type" : "FlowStatusId",
+ "type" : {
+ "type" : "record",
+ "name" : "FlowStatusId",
+ "doc" : "Identifier for a specific execution of a flow",
+ "fields" : [ {
+ "name" : "flowName",
+ "type" : "string",
+ "doc" : "Name of the flow"
+ }, {
+ "name" : "flowGroup",
+ "type" : "string",
+ "doc" : "Group of the flow. This defines the namespace for the flow."
+ }, {
+ "name" : "flowExecutionId",
+ "type" : "long",
+ "doc" : "Execution id for the flow"
+ } ]
+ },
"doc" : "Flow status identifier"
}, {
"name" : "executionStatistics",
- "type" : "FlowStatistics",
+ "type" : {
+ "type" : "record",
+ "name" : "FlowStatistics",
+ "doc" : "Statistics from a flow execution",
+ "fields" : [ {
+ "name" : "executionStartTime",
+ "type" : "long",
+ "doc" : "Epoch time of when the execution began"
+ }, {
+ "name" : "executionEndTime",
+ "type" : "long",
+ "doc" : "Epoch time of when the execution ended"
+ } ]
+ },
"doc" : "Statistics for this execution of the flow"
}, {
"name" : "executionStatus",
@@ -191,20 +79,130 @@
"name" : "jobStatuses",
"type" : {
"type" : "array",
- "items" : "JobStatus"
+ "items" : {
+ "type" : "record",
+ "name" : "JobStatus",
+ "doc" : "Execution status for a job",
+ "fields" : [ {
+ "name" : "flowId",
+ "type" : {
+ "type" : "record",
+ "name" : "FlowId",
+ "doc" : "Identifier for a Gobblin as a Service flow",
+ "fields" : [ {
+ "name" : "flowName",
+ "type" : "string",
+ "doc" : "Name of the flow",
+ "validate" : {
+ "strlen" : {
+ "max" : 128,
+ "min" : 1
+ }
+ }
+ }, {
+ "name" : "flowGroup",
+ "type" : "string",
+ "doc" : "Group of the flow. This defines the namespace for the flow.",
+ "validate" : {
+ "strlen" : {
+ "max" : 128,
+ "min" : 1
+ }
+ }
+ } ]
+ },
+ "doc" : "Identifier of the flow the job belongs to"
+ }, {
+ "name" : "jobId",
+ "type" : {
+ "type" : "record",
+ "name" : "JobId",
+ "doc" : "Identifier for a Gobblin job",
+ "fields" : [ {
+ "name" : "jobName",
+ "type" : "string",
+ "doc" : "Name of the job",
+ "validate" : {
+ "strlen" : {
+ "max" : 600,
+ "min" : 1
+ }
+ }
+ }, {
+ "name" : "jobGroup",
+ "type" : "string",
+ "doc" : "Group of the job. This defines the namespace for the job.",
+ "validate" : {
+ "strlen" : {
+ "max" : 128,
+ "min" : 1
+ }
+ }
+ } ]
+ },
+ "doc" : "Identifier of the job"
+ }, {
+ "name" : "jobTag",
+ "type" : "string",
+ "doc" : "Tag of the job",
+ "optional" : true
+ }, {
+ "name" : "executionStatus",
+ "type" : "ExecutionStatus",
+ "doc" : "Job execution status"
+ }, {
+ "name" : "message",
+ "type" : "string",
+ "doc" : "Error or status message"
+ }, {
+ "name" : "metrics",
+ "type" : "string",
+ "doc" : "metrics information",
+ "optional" : true
+ }, {
+ "name" : "executionStatistics",
+ "type" : {
+ "type" : "record",
+ "name" : "JobStatistics",
+ "doc" : "Statistics from a job execution",
+ "fields" : [ {
+ "name" : "executionStartTime",
+ "type" : "long",
+ "doc" : "Epoch time of when the execution began"
+ }, {
+ "name" : "executionEndTime",
+ "type" : "long",
+ "doc" : "Epoch time of when the execution ended"
+ }, {
+ "name" : "processedCount",
+ "type" : "long",
+ "doc" : "number of records processed in the last job execution"
+ } ]
+ },
+ "doc" : "Statistics from the job execution. The values may be updated during the run of a job."
+ }, {
+ "name" : "jobState",
+ "type" : {
+ "type" : "record",
+ "name" : "JobState",
+ "doc" : "Job state that is updated at the end of an execution",
+ "fields" : [ {
+ "name" : "lowWatermark",
+ "type" : "string",
+ "doc" : "Low watermark after last execution"
+ }, {
+ "name" : "highWatermark",
+ "type" : "string",
+ "doc" : "High watermark after last execution"
+ } ]
+ },
+ "doc" : "Job state that is updated only at the start and end of a job execution."
+ } ]
+ }
},
"doc" : "Status of jobs belonging to the flow"
} ]
- }, {
- "type" : "record",
- "name" : "EmptyRecord",
- "namespace" : "com.linkedin.restli.common",
- "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request<EmptyRecord> to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
- "fields" : [ ],
- "validate" : {
- "com.linkedin.restli.common.EmptyRecordValidator" : { }
- }
- } ],
+ }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.FlowStatistics", "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus" ],
"schema" : {
"name" : "flowexecutions",
"namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index e3b171f..034fca0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -1,36 +1,13 @@
{
"models" : [ {
"type" : "record",
- "name" : "FlowStatusId",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Identifier for a specific execution of a flow",
- "fields" : [ {
- "name" : "flowName",
- "type" : "string",
- "doc" : "Name of the flow"
- }, {
- "name" : "flowGroup",
- "type" : "string",
- "doc" : "Group of the flow. This defines the namespace for the flow."
- }, {
- "name" : "flowExecutionId",
- "type" : "long",
- "doc" : "Execution id for the flow"
- } ]
- }, {
- "type" : "record",
- "name" : "FlowStatistics",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Statistics from a flow execution",
- "fields" : [ {
- "name" : "executionStartTime",
- "type" : "long",
- "doc" : "Epoch time of when the execution began"
- }, {
- "name" : "executionEndTime",
- "type" : "long",
- "doc" : "Epoch time of when the execution ended"
- } ]
+ "name" : "EmptyRecord",
+ "namespace" : "com.linkedin.restli.common",
+ "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request<EmptyRecord> to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
+ "fields" : [ ],
+ "validate" : {
+ "com.linkedin.restli.common.EmptyRecordValidator" : { }
+ }
}, {
"type" : "enum",
"name" : "ExecutionStatus",
@@ -38,14 +15,14 @@
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbolDocs" : {
+ "CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
- "PENDING" : "Flow or job is in pending state.",
- "PENDING_RETRY" : "Flow or job is pending retry.",
- "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
- "RUNNING" : "Flow or job is currently executing",
"COMPLETE" : "Flow or job completed execution",
"FAILED" : "Flow or job failed",
- "CANCELLED" : "Flow cancelled."
+ "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
+ "PENDING" : "Flow or job is in pending state.",
+ "PENDING_RETRY" : "Flow or job is pending retry.",
+ "RUNNING" : "Flow or job is currently executing"
}
}, {
"type" : "record",
@@ -58,8 +35,8 @@
"doc" : "Name of the flow",
"validate" : {
"strlen" : {
- "min" : 1,
- "max" : 128
+ "max" : 128,
+ "min" : 1
}
}
}, {
@@ -68,42 +45,16 @@
"doc" : "Group of the flow. This defines the namespace for the flow.",
"validate" : {
"strlen" : {
- "min" : 1,
- "max" : 128
- }
- }
- } ]
- }, {
- "type" : "record",
- "name" : "JobId",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Identifier for a Gobblin job",
- "fields" : [ {
- "name" : "jobName",
- "type" : "string",
- "doc" : "Name of the job",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 600
- }
- }
- }, {
- "name" : "jobGroup",
- "type" : "string",
- "doc" : "Group of the job. This defines the namespace for the job.",
- "validate" : {
- "strlen" : {
- "min" : 1,
- "max" : 128
+ "max" : 128,
+ "min" : 1
}
}
} ]
}, {
"type" : "record",
- "name" : "JobStatistics",
+ "name" : "FlowStatistics",
"namespace" : "org.apache.gobblin.service",
- "doc" : "Statistics from a job execution",
+ "doc" : "Statistics from a flow execution",
"fields" : [ {
"name" : "executionStartTime",
"type" : "long",
@@ -112,59 +63,6 @@
"name" : "executionEndTime",
"type" : "long",
"doc" : "Epoch time of when the execution ended"
- }, {
- "name" : "processedCount",
- "type" : "long",
- "doc" : "number of records processed in the last job execution"
- } ]
- }, {
- "type" : "record",
- "name" : "JobState",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Job state that is updated at the end of an execution",
- "fields" : [ {
- "name" : "lowWatermark",
- "type" : "string",
- "doc" : "Low watermark after last execution"
- }, {
- "name" : "highWatermark",
- "type" : "string",
- "doc" : "High watermark after last execution"
- } ]
- }, {
- "type" : "record",
- "name" : "JobStatus",
- "namespace" : "org.apache.gobblin.service",
- "doc" : "Execution status for a job",
- "fields" : [ {
- "name" : "flowId",
- "type" : "FlowId",
- "doc" : "Identifier of the flow the job belongs to"
- }, {
- "name" : "jobId",
- "type" : "JobId",
- "doc" : "Identifier of the job"
- }, {
- "name" : "jobTag",
- "type" : "string",
- "doc" : "Tag of the job",
- "optional" : true
- }, {
- "name" : "executionStatus",
- "type" : "ExecutionStatus",
- "doc" : "Job execution status"
- }, {
- "name" : "message",
- "type" : "string",
- "doc" : "Error or status message"
- }, {
- "name" : "executionStatistics",
- "type" : "JobStatistics",
- "doc" : "Statistics from the job execution. The values may be updated during the run of a job."
- }, {
- "name" : "jobState",
- "type" : "JobState",
- "doc" : "Job state that is updated only at the start and end of a job execution."
} ]
}, {
"type" : "record",
@@ -173,7 +71,24 @@
"doc" : "Status of a flow",
"fields" : [ {
"name" : "id",
- "type" : "FlowStatusId",
+ "type" : {
+ "type" : "record",
+ "name" : "FlowStatusId",
+ "doc" : "Identifier for a specific execution of a flow",
+ "fields" : [ {
+ "name" : "flowName",
+ "type" : "string",
+ "doc" : "Name of the flow"
+ }, {
+ "name" : "flowGroup",
+ "type" : "string",
+ "doc" : "Group of the flow. This defines the namespace for the flow."
+ }, {
+ "name" : "flowExecutionId",
+ "type" : "long",
+ "doc" : "Execution id for the flow"
+ } ]
+ },
"doc" : "Flow status identifier"
}, {
"name" : "executionStatistics",
@@ -191,21 +106,106 @@
"name" : "jobStatuses",
"type" : {
"type" : "array",
- "items" : "JobStatus"
+ "items" : {
+ "type" : "record",
+ "name" : "JobStatus",
+ "doc" : "Execution status for a job",
+ "fields" : [ {
+ "name" : "flowId",
+ "type" : "FlowId",
+ "doc" : "Identifier of the flow the job belongs to"
+ }, {
+ "name" : "jobId",
+ "type" : {
+ "type" : "record",
+ "name" : "JobId",
+ "doc" : "Identifier for a Gobblin job",
+ "fields" : [ {
+ "name" : "jobName",
+ "type" : "string",
+ "doc" : "Name of the job",
+ "validate" : {
+ "strlen" : {
+ "max" : 600,
+ "min" : 1
+ }
+ }
+ }, {
+ "name" : "jobGroup",
+ "type" : "string",
+ "doc" : "Group of the job. This defines the namespace for the job.",
+ "validate" : {
+ "strlen" : {
+ "max" : 128,
+ "min" : 1
+ }
+ }
+ } ]
+ },
+ "doc" : "Identifier of the job"
+ }, {
+ "name" : "jobTag",
+ "type" : "string",
+ "doc" : "Tag of the job",
+ "optional" : true
+ }, {
+ "name" : "executionStatus",
+ "type" : "ExecutionStatus",
+ "doc" : "Job execution status"
+ }, {
+ "name" : "message",
+ "type" : "string",
+ "doc" : "Error or status message"
+ }, {
+ "name" : "metrics",
+ "type" : "string",
+ "doc" : "metrics information",
+ "optional" : true
+ }, {
+ "name" : "executionStatistics",
+ "type" : {
+ "type" : "record",
+ "name" : "JobStatistics",
+ "doc" : "Statistics from a job execution",
+ "fields" : [ {
+ "name" : "executionStartTime",
+ "type" : "long",
+ "doc" : "Epoch time of when the execution began"
+ }, {
+ "name" : "executionEndTime",
+ "type" : "long",
+ "doc" : "Epoch time of when the execution ended"
+ }, {
+ "name" : "processedCount",
+ "type" : "long",
+ "doc" : "number of records processed in the last job execution"
+ } ]
+ },
+ "doc" : "Statistics from the job execution. The values may be updated during the run of a job."
+ }, {
+ "name" : "jobState",
+ "type" : {
+ "type" : "record",
+ "name" : "JobState",
+ "doc" : "Job state that is updated at the end of an execution",
+ "fields" : [ {
+ "name" : "lowWatermark",
+ "type" : "string",
+ "doc" : "Low watermark after last execution"
+ }, {
+ "name" : "highWatermark",
+ "type" : "string",
+ "doc" : "High watermark after last execution"
+ } ]
+ },
+ "doc" : "Job state that is updated only at the start and end of a job execution."
+ } ]
+ }
},
"doc" : "Status of jobs belonging to the flow"
} ],
"deprecated" : "Use FlowExecution instead"
- }, {
- "type" : "record",
- "name" : "EmptyRecord",
- "namespace" : "com.linkedin.restli.common",
- "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request<EmptyRecord> to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
- "fields" : [ ],
- "validate" : {
- "com.linkedin.restli.common.EmptyRecordValidator" : { }
- }
- } ],
+ }, "org.apache.gobblin.service.FlowStatusId", "org.apache.gobblin.service.JobId", "org.apache.gobblin.service.JobState", "org.apache.gobblin.service.JobStatistics", "org.apache.gobblin.service.JobStatus" ],
"schema" : {
"name" : "flowstatuses",
"namespace" : "org.apache.gobblin.service",
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowClientUtils.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowClientUtils.java
new file mode 100644
index 0000000..18e78d3
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowClientUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import com.linkedin.r2.RemoteInvocationException;
+import com.linkedin.r2.message.RequestContext;
+import com.linkedin.restli.client.Request;
+import com.linkedin.restli.client.Response;
+import com.linkedin.restli.client.RestClient;
+import com.linkedin.restli.client.RestLiResponseException;
+
+
+/**
+ * Utils to be used by clients
+ */
+public class FlowClientUtils {
+ /**
+ * Send a restli {@link Request} to the server through a {@link RestClient}, but if the request is rejected due to not
+ * being sent to a leader node, get the leader node from the errorDetails and retry the request with that node by setting
+ * the D2-Hint-TargetService attribute.
+ * @param restClient rest client to use to send the request
+ * @param request request to send
+ * @param primaryResource resource part of the request URL (e.g. flowconfigsV2, which can be taken from
+ * {@link FlowconfigsV2RequestBuilders#getPrimaryResource()}
+ * @return {@link Response} returned from the request
+ * @throws RemoteInvocationException
+ */
+ public static Response<?> sendRequestWithRetry(RestClient restClient, Request<?> request, String primaryResource) throws RemoteInvocationException {
+ Response<?> response;
+ try {
+ response = restClient.sendRequest(request).getResponse();
+ } catch (RestLiResponseException exception) {
+ if (exception.hasErrorDetails() && exception.getErrorDetails().containsKey(ServiceConfigKeys.LEADER_URL)) {
+ String leaderUrl = exception.getErrorDetails().getString(ServiceConfigKeys.LEADER_URL);
+ RequestContext requestContext = new RequestContext();
+ try {
+ requestContext.putLocalAttr("D2-Hint-TargetService", new URI(leaderUrl + "/" + primaryResource));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Could not build URI for for url " + leaderUrl, e);
+ }
+ response = restClient.sendRequest(request, requestContext).getResponse();
+ } else {
+ throw exception;
+ }
+ }
+ return response;
+ }
+}
\ No newline at end of file
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 27dd79a..64269d4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -115,10 +115,9 @@ public class FlowConfigV2Client implements Closeable {
CreateIdEntityRequest<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> request =
_flowconfigsV2RequestBuilders.createAndGet().input(flowConfig).build();
- ResponseFuture<IdEntityResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig>> flowConfigResponseFuture =
- _restClient.get().sendRequest(request);
+ Response<?> response = FlowClientUtils.sendRequestWithRetry(_restClient.get(), request, FlowconfigsV2RequestBuilders.getPrimaryResource());
- return createFlowStatusId(flowConfigResponseFuture.getResponse().getLocation().toString());
+ return createFlowStatusId(response.getLocation().toString());
}
private FlowStatusId createFlowStatusId(String locationHeader) {
@@ -156,9 +155,7 @@ public class FlowConfigV2Client implements Closeable {
_flowconfigsV2RequestBuilders.update().id(new ComplexResourceKey<>(flowId, new FlowStatusId()))
.input(flowConfig).build();
- ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(updateRequest);
-
- response.getResponse();
+ FlowClientUtils.sendRequestWithRetry(_restClient.get(), updateRequest, FlowconfigsV2RequestBuilders.getPrimaryResource());
}
/**
@@ -175,9 +172,7 @@ public class FlowConfigV2Client implements Closeable {
_flowconfigsV2RequestBuilders.partialUpdate().id(new ComplexResourceKey<>(flowId, new FlowStatusId()))
.input(flowConfigPatch).build();
- ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(partialUpdateRequest);
-
- response.getResponse();
+ FlowClientUtils.sendRequestWithRetry(_restClient.get(), partialUpdateRequest, FlowconfigsV2RequestBuilders.getPrimaryResource());
}
/**
@@ -240,9 +235,8 @@ public class FlowConfigV2Client implements Closeable {
DeleteRequest<FlowConfig> deleteRequest = _flowconfigsV2RequestBuilders.delete()
.id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
- ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);
- response.getResponse();
+ FlowClientUtils.sendRequestWithRetry(_restClient.get(), deleteRequest, FlowconfigsV2RequestBuilders.getPrimaryResource());
}
/**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index c78e26a..ffae759 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service;
+import com.linkedin.restli.client.DeleteRequest;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
@@ -160,6 +161,21 @@ public class FlowExecutionClient implements Closeable {
}
}
+ /**
+ * Kill the flow with given FlowStatusId
+ * @param flowStatusId identifier of flow execution to kill
+ * @throws RemoteInvocationException
+ */
+ public void deleteFlowExecution(FlowStatusId flowStatusId)
+ throws RemoteInvocationException {
+ LOG.debug("deleteFlowExecution with groupName " + flowStatusId.getFlowGroup() + " flowName " +
+ flowStatusId.getFlowName() + " flowExecutionId " + flowStatusId.getFlowExecutionId());
+
+ DeleteRequest<FlowExecution> deleteRequest = _flowexecutionsRequestBuilders.delete()
+ .id(new ComplexResourceKey<>(flowStatusId, new EmptyRecord())).build();
+
+ FlowClientUtils.sendRequestWithRetry(_restClient.get(), deleteRequest, FlowexecutionsRequestBuilders.getPrimaryResource());
+ }
@Override
public void close()
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 5a738c6..54922f4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -17,22 +17,12 @@
package org.apache.gobblin.service;
-import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
-import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Strings;
import com.google.inject.Inject;
-import com.linkedin.data.template.SetMode;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.PagingContext;
-import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.Context;
import com.linkedin.restli.server.annotations.Finder;
@@ -41,21 +31,16 @@ import com.linkedin.restli.server.annotations.QueryParam;
import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
-import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.service.monitoring.JobStatusRetriever;
-
/**
* Resource for handling flow execution requests
*/
@RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {
- private static final Logger LOG = LoggerFactory.getLogger(FlowExecutionResource.class);
- public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
- public static final String MESSAGE_SEPARATOR = ", ";
+ public static final String FLOW_EXECUTION_GENERATOR_INJECT_NAME = "FlowExecutionResourceHandler";
- @Inject @javax.inject.Inject @javax.inject.Named(FLOW_STATUS_GENERATOR_INJECT_NAME)
- FlowStatusGenerator _flowStatusGenerator;
+ @Inject @javax.inject.Inject @javax.inject.Named(FLOW_EXECUTION_GENERATOR_INJECT_NAME)
+ FlowExecutionResourceHandler flowExecutionResourceHandler;
public FlowExecutionResource() {}
@@ -66,25 +51,13 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
*/
@Override
public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
- FlowExecution flowExecution = convertFlowStatus(getFlowStatusFromGenerator(key, this._flowStatusGenerator));
- if (flowExecution == null) {
- throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowStatusId " + key.getKey()
- + ". The flowStatusId may be incorrect, or the flow execution may have been cleaned up.");
- }
- return flowExecution;
+ return this.flowExecutionResourceHandler.get(key);
}
@Finder("latestFlowExecution")
public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context, @QueryParam("flowId") FlowId flowId,
@Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String executionStatus) {
- List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this._flowStatusGenerator);
-
- if (flowStatuses != null) {
- return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
- }
-
- throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowId " + flowId
- + ". The flowId may be incorrect, or the flow execution may have been cleaned up.");
+ return this.flowExecutionResourceHandler.getLatestFlowExecution(context, flowId, count, tag, executionStatus);
}
/**
@@ -94,107 +67,7 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
*/
@Override
public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
- String flowGroup = key.getKey().getFlowGroup();
- String flowName = key.getKey().getFlowName();
- Long flowExecutionId = key.getKey().getFlowExecutionId();
- _flowStatusGenerator.killFlow(flowGroup, flowName, flowExecutionId);
- return new UpdateResponse(HttpStatus.S_200_OK);
- }
-
- public static org.apache.gobblin.service.monitoring.FlowStatus getFlowStatusFromGenerator(ComplexResourceKey<FlowStatusId, EmptyRecord> key,
- FlowStatusGenerator flowStatusGenerator) {
- String flowGroup = key.getKey().getFlowGroup();
- String flowName = key.getKey().getFlowName();
- long flowExecutionId = key.getKey().getFlowExecutionId();
-
- LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
-
- return flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
- }
-
- public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
- Integer count, String tag, String executionStatus, FlowStatusGenerator flowStatusGenerator) {
- if (count == null) {
- count = 1;
- }
- LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
-
- return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus);
- }
-
- /**
- * Forms a {@link FlowExecution} from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
- * @param monitoringFlowStatus
- * @return a {@link FlowExecution} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
- */
- public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
- if (monitoringFlowStatus == null) {
- return null;
- }
-
- Iterator<org.apache.gobblin.service.monitoring.JobStatus> jobStatusIter = monitoringFlowStatus.getJobStatusIterator();
- JobStatusArray jobStatusArray = new JobStatusArray();
- FlowId flowId = new FlowId().setFlowName(monitoringFlowStatus.getFlowName())
- .setFlowGroup(monitoringFlowStatus.getFlowGroup());
-
- long flowEndTime = 0L;
- ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
-
- String flowMessage = "";
-
- while (jobStatusIter.hasNext()) {
- org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = jobStatusIter.next();
-
- // Check if this is the flow status instead of a single job status
- if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
- flowEndTime = queriedJobStatus.getEndTime();
- flowExecutionStatus = ExecutionStatus.valueOf(queriedJobStatus.getEventName());
- if (queriedJobStatus.getMessage() != null) {
- flowMessage = queriedJobStatus.getMessage();
- }
- continue;
- }
-
- JobStatus jobStatus = new JobStatus();
-
- jobStatus.setFlowId(flowId)
- .setJobId(new JobId().setJobName(queriedJobStatus.getJobName())
- .setJobGroup(queriedJobStatus.getJobGroup()))
- .setJobTag(queriedJobStatus.getJobTag(), SetMode.IGNORE_NULL)
- .setExecutionStatistics(new JobStatistics()
- .setExecutionStartTime(queriedJobStatus.getStartTime())
- .setExecutionEndTime(queriedJobStatus.getEndTime())
- .setProcessedCount(queriedJobStatus.getProcessedCount()))
- .setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
- .setMessage(queriedJobStatus.getMessage())
- .setJobState(new JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
- setHighWatermark(queriedJobStatus.getHighWatermark()));
-
- if (!Strings.isNullOrEmpty(queriedJobStatus.getMetrics())) {
- jobStatus.setMetrics(queriedJobStatus.getMetrics());
- }
-
- jobStatusArray.add(jobStatus);
- }
-
- jobStatusArray.sort(Comparator.comparing((JobStatus js) -> js.getExecutionStatistics().getExecutionStartTime()));
-
- return new FlowExecution()
- .setId(new FlowStatusId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName())
- .setFlowExecutionId(monitoringFlowStatus.getFlowExecutionId()))
- .setExecutionStatistics(new FlowStatistics().setExecutionStartTime(getFlowStartTime(monitoringFlowStatus))
- .setExecutionEndTime(flowEndTime))
- .setMessage(flowMessage)
- .setExecutionStatus(flowExecutionStatus)
- .setJobStatuses(jobStatusArray);
- }
-
- /**
- * Return the flow start time given a {@link org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
- * assumed to be the flow start time.
- */
- private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
- return flowStatus.getFlowExecutionId();
+ return this.flowExecutionResourceHandler.delete(key);
}
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
new file mode 100644
index 0000000..918f5ac
--- /dev/null
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.Context;
+import com.linkedin.restli.server.annotations.Optional;
+import com.linkedin.restli.server.annotations.QueryParam;
+
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+
+
+public interface FlowExecutionResourceHandler {
+ /**
+ * Get {@link FlowExecution}
+ */
+ public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key);
+
+ /**
+ * Get latest {@link FlowExecution}
+ */
+ public List<FlowExecution> getLatestFlowExecution(PagingContext context, FlowId flowId, Integer count, String tag, String executionStatus);
+
+ /**
+ * Kill a running {@link FlowExecution}
+ */
+ public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key);
+}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
similarity index 73%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index 5a738c6..318289d 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -14,91 +14,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.service;
-
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringEscapeUtils;
import com.google.common.base.Strings;
-import com.google.inject.Inject;
import com.linkedin.data.template.SetMode;
+import com.linkedin.data.template.StringMap;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.server.CreateKVResponse;
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.Context;
-import com.linkedin.restli.server.annotations.Finder;
import com.linkedin.restli.server.annotations.Optional;
import com.linkedin.restli.server.annotations.QueryParam;
-import com.linkedin.restli.server.annotations.RestLiCollection;
-import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.monitoring.FlowStatus;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
-/**
- * Resource for handling flow execution requests
- */
-@RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
-public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {
- private static final Logger LOG = LoggerFactory.getLogger(FlowExecutionResource.class);
- public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
- public static final String MESSAGE_SEPARATOR = ", ";
+@Slf4j
+public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceHandler {
- @Inject @javax.inject.Inject @javax.inject.Named(FLOW_STATUS_GENERATOR_INJECT_NAME)
- FlowStatusGenerator _flowStatusGenerator;
+ private final FlowStatusGenerator flowStatusGenerator;
- public FlowExecutionResource() {}
+ public FlowExecutionResourceLocalHandler(FlowStatusGenerator flowStatusGenerator) {
+ this.flowStatusGenerator = flowStatusGenerator;
+ }
- /**
- * Retrieve the FlowExecution with the given key
- * @param key {@link FlowStatusId} of flow to get
- * @return corresponding {@link FlowExecution}
- */
@Override
public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
- FlowExecution flowExecution = convertFlowStatus(getFlowStatusFromGenerator(key, this._flowStatusGenerator));
+ FlowExecution flowExecution = convertFlowStatus(getFlowStatusFromGenerator(key, this.flowStatusGenerator));
if (flowExecution == null) {
throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowStatusId " + key.getKey()
- + ". The flowStatusId may be incorrect, or the flow execution may have been cleaned up.");
+ + ". The flowStatusId may be incorrect, or the flow execution may have been cleaned up.");
}
return flowExecution;
}
- @Finder("latestFlowExecution")
- public List<FlowExecution> getLatestFlowExecution(@Context PagingContext context, @QueryParam("flowId") FlowId flowId,
- @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag, @Optional @QueryParam("executionStatus") String executionStatus) {
- List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this._flowStatusGenerator);
+ @Override
+ public List<FlowExecution> getLatestFlowExecution(PagingContext context, FlowId flowId, Integer count, String tag, String executionStatus) {
+ List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = getLatestFlowStatusesFromGenerator(flowId, count, tag, executionStatus, this.flowStatusGenerator);
if (flowStatuses != null) {
- return flowStatuses.stream().map(FlowExecutionResource::convertFlowStatus).collect(Collectors.toList());
+ return flowStatuses.stream().map(FlowExecutionResourceLocalHandler::convertFlowStatus).collect(Collectors.toList());
}
throw new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, "No flow execution found for flowId " + flowId
+ ". The flowId may be incorrect, or the flow execution may have been cleaned up.");
}
- /**
- * Kill the FlowExecution with the given key
- * @param key {@link FlowStatusId} of flow to kill
- * @return {@link UpdateResponse}
- */
@Override
public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
- String flowGroup = key.getKey().getFlowGroup();
- String flowName = key.getKey().getFlowName();
- Long flowExecutionId = key.getKey().getFlowExecutionId();
- _flowStatusGenerator.killFlow(flowGroup, flowName, flowExecutionId);
- return new UpdateResponse(HttpStatus.S_200_OK);
+ throw new UnsupportedOperationException("Delete should be handled in GobblinServiceFlowConfigResourceHandler");
}
public static org.apache.gobblin.service.monitoring.FlowStatus getFlowStatusFromGenerator(ComplexResourceKey<FlowStatusId, EmptyRecord> key,
@@ -107,17 +92,17 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
String flowName = key.getKey().getFlowName();
long flowExecutionId = key.getKey().getFlowExecutionId();
- LOG.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
+ log.info("Get called with flowGroup " + flowGroup + " flowName " + flowName + " flowExecutionId " + flowExecutionId);
return flowStatusGenerator.getFlowStatus(flowName, flowGroup, flowExecutionId, null);
}
- public static List<org.apache.gobblin.service.monitoring.FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
+ public static List<FlowStatus> getLatestFlowStatusesFromGenerator(FlowId flowId,
Integer count, String tag, String executionStatus, FlowStatusGenerator flowStatusGenerator) {
if (count == null) {
count = 1;
}
- LOG.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
+ log.info("get latest called with flowGroup " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName() + " count " + count);
return flowStatusGenerator.getLatestFlowStatus(flowId.getFlowName(), flowId.getFlowGroup(), count, tag, executionStatus);
}
@@ -197,4 +182,3 @@ public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatus
return flowStatus.getFlowExecutionId();
}
}
-
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 4c8c623..2940d10 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -58,13 +58,14 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
@Override
public FlowStatus get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
// this returns null to raise a 404 error if flowStatus is null
- return convertFlowStatus(FlowExecutionResource.getFlowStatusFromGenerator(key, this._flowStatusGenerator));
+ return convertFlowStatus(FlowExecutionResourceLocalHandler.getFlowStatusFromGenerator(key, this._flowStatusGenerator));
}
@Finder("latestFlowStatus")
public List<FlowStatus> getLatestFlowStatus(@Context PagingContext context,
@QueryParam("flowId") FlowId flowId, @Optional @QueryParam("count") Integer count, @Optional @QueryParam("tag") String tag) {
- List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResource.getLatestFlowStatusesFromGenerator(flowId, count, tag, null, this._flowStatusGenerator);
+ List<org.apache.gobblin.service.monitoring.FlowStatus> flowStatuses = FlowExecutionResourceLocalHandler
+ .getLatestFlowStatusesFromGenerator(flowId, count, tag, null, this._flowStatusGenerator);
if (flowStatuses != null) {
return flowStatuses.stream().map(this::convertFlowStatus).collect(Collectors.toList());
@@ -81,7 +82,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId,
* @return a {@link org.apache.gobblin.service.FlowStatus} converted from a {@link org.apache.gobblin.service.monitoring.FlowStatus}
*/
private FlowStatus convertFlowStatus(org.apache.gobblin.service.monitoring.FlowStatus monitoringFlowStatus) {
- FlowExecution flowExecution = FlowExecutionResource.convertFlowStatus(monitoringFlowStatus);
+ FlowExecution flowExecution = FlowExecutionResourceLocalHandler.convertFlowStatus(monitoringFlowStatus);
return new FlowStatus()
.setId(flowExecution.getId())
.setExecutionStatistics(flowExecution.getExecutionStatistics())
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 785c351..7546e12 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -24,7 +24,6 @@ import java.util.stream.Collectors;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.common.eventbus.EventBus;
import lombok.Builder;
@@ -41,7 +40,6 @@ public class FlowStatusGenerator {
public static final int MAX_LOOKBACK = 100;
private final JobStatusRetriever jobStatusRetriever;
- private final EventBus eventBus;
/**
* Get the flow statuses of last <code>count</code> (or fewer) executions
@@ -171,11 +169,4 @@ public class FlowStatusGenerator {
String status = jobStatus.getEventName().toUpperCase();
return !FINISHED_STATUSES.contains(status);
}
-
- /**
- * Send kill request for the given flow
- */
- public void killFlow(String flowGroup, String flowName, Long flowExecutionId) {
- this.eventBus.post(new KillFlowEvent(flowGroup, flowName, flowExecutionId));
- }
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 60781f9..0643b6a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -56,7 +56,6 @@ import com.google.inject.Module;
import com.google.inject.name.Names;
import com.linkedin.data.template.StringMap;
import com.linkedin.r2.RemoteInvocationException;
-import com.linkedin.restli.server.resources.BaseResource;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -78,6 +77,9 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.service.FlowExecutionResource;
+import org.apache.gobblin.service.FlowExecutionResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
@@ -94,6 +96,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.modules.utils.HelixUtils;
@@ -148,6 +151,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
protected GobblinServiceFlowConfigResourceHandler resourceHandler;
@Getter
protected GobblinServiceFlowConfigResourceHandler v2ResourceHandler;
+ @Getter
+ protected GobblinServiceFlowExecutionResourceHandler flowExecutionResourceHandler;
+ @Getter
+ protected FlowStatusGenerator flowStatusGenerator;
protected boolean flowCatalogLocalCommit;
@Getter
@@ -256,11 +263,15 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
this.serviceLauncher.addService(this.jobStatusMonitor);
}
+ this.flowStatusGenerator = buildFlowStatusGenerator(this.config);
+
// Initialize ServiceScheduler
this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
if (isSchedulerEnabled) {
this.orchestrator = new Orchestrator(config, Optional.of(this.topologyCatalog), Optional.fromNullable(this.dagManager), Optional.of(LOGGER));
+ this.orchestrator.setFlowStatusGenerator(this.flowStatusGenerator);
+
SchedulerService schedulerService = new SchedulerService(ConfigUtils.configToProperties(config));
this.scheduler = new GobblinServiceJobScheduler(this.serviceName, config, this.helixManager,
@@ -271,17 +282,24 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
}
// Initialize RestLI
+ boolean forceLeader = ConfigUtils.getBoolean(this.config, ServiceConfigKeys.FORCE_LEADER, ServiceConfigKeys.DEFAULT_FORCE_LEADER);
+
this.resourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
this.flowCatalogLocalCommit,
new FlowConfigResourceLocalHandler(this.flowCatalog),
this.helixManager,
- this.scheduler);
+ this.scheduler,
+ forceLeader);
this.v2ResourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
this.flowCatalogLocalCommit,
new FlowConfigV2ResourceLocalHandler(this.flowCatalog),
this.helixManager,
- this.scheduler);
+ this.scheduler,
+ forceLeader);
+
+ this.flowExecutionResourceHandler = new GobblinServiceFlowExecutionResourceHandler(new FlowExecutionResourceLocalHandler(this.flowStatusGenerator),
+ this.eventBus, this.helixManager, forceLeader);
this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
@@ -296,6 +314,9 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
binder.bind(FlowConfigsResourceHandler.class)
.annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME))
.toInstance(GobblinServiceManager.this.v2ResourceHandler);
+ binder.bind(FlowExecutionResourceHandler.class)
+ .annotatedWith(Names.named(FlowExecutionResource.FLOW_EXECUTION_GENERATOR_INJECT_NAME))
+ .toInstance(GobblinServiceManager.this.flowExecutionResourceHandler);
binder.bindConstant()
.annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE))
.to(Boolean.TRUE);
@@ -359,8 +380,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
*/
private HelixManager buildHelixManager(Config config, String zkConnectionString) {
String helixClusterName = config.getString(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY);
- String helixInstanceName = ConfigUtils.getString(config, ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY,
- GobblinServiceManager.class.getSimpleName());
+ String helixInstanceName = HelixUtils.buildHelixInstanceName(config, GobblinServiceManager.class.getSimpleName());
LOGGER.info("Creating Helix cluster if not already present [overwrite = false]: " + zkConnectionString);
HelixUtils.createGobblinHelixCluster(zkConnectionString, helixClusterName, false);
@@ -639,7 +659,6 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
try (GobblinServiceManager gobblinServiceManager = new GobblinServiceManager(
cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(cmd),
config, Optional.<Path>absent())) {
- gobblinServiceManager.getOrchestrator().setFlowStatusGenerator(gobblinServiceManager.buildFlowStatusGenerator(config));
gobblinServiceManager.start();
if (isTestMode) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 0d862d1..12ce857 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -63,16 +63,19 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
private FlowConfigResourceLocalHandler localHandler;
private Optional<HelixManager> helixManager;
private GobblinServiceJobScheduler jobScheduler;
+ private boolean forceLeader;
public GobblinServiceFlowConfigResourceHandler(String serviceName, boolean flowCatalogLocalCommit,
FlowConfigResourceLocalHandler handler,
Optional<HelixManager> manager,
- GobblinServiceJobScheduler jobScheduler) {
+ GobblinServiceJobScheduler jobScheduler,
+ boolean forceLeader) {
this.flowCatalogLocalCommit = flowCatalogLocalCommit;
this.serviceName = serviceName;
this.localHandler = handler;
this.helixManager = manager;
this.jobScheduler = jobScheduler;
+ this.forceLeader = forceLeader;
}
@Override
@@ -117,6 +120,10 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, flowName, flowGroup);
+ if (forceLeader) {
+ HelixUtils.throwErrorIfNotLeader(helixManager);
+ }
+
try {
if (!jobScheduler.isActive() && helixManager.isPresent()) {
CreateResponse response = null;
@@ -167,6 +174,10 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_UPDATE, flowName, flowGroup);
+ if (forceLeader) {
+ HelixUtils.throwErrorIfNotLeader(helixManager);
+ }
+
try {
if (!jobScheduler.isActive() && helixManager.isPresent()) {
@@ -223,6 +234,10 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, flowName, flowGroup);
+ if (forceLeader) {
+ HelixUtils.throwErrorIfNotLeader(helixManager);
+ }
+
try {
if (!jobScheduler.isActive() && helixManager.isPresent()) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
new file mode 100644
index 0000000..2a94fb0
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.UpdateResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.FlowExecution;
+import org.apache.gobblin.service.FlowExecutionResourceHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.FlowStatusId;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+
+
+/**
+ * {@link FlowExecutionResourceHandler} that calls underlying resource handler, but does extra work that requires objects
+ * like the {@link HelixManager}. For now, that is just checking leadership and sending the kill through the eventBus
+ * for the delete method.
+ */
+@Slf4j
+public class GobblinServiceFlowExecutionResourceHandler implements FlowExecutionResourceHandler {
+ private FlowExecutionResourceHandler localHandler;
+ private EventBus eventBus;
+ private Optional<HelixManager> helixManager;
+ private boolean forceLeader;
+
+ public GobblinServiceFlowExecutionResourceHandler(FlowExecutionResourceHandler handler, EventBus eventBus,
+ Optional<HelixManager> manager, boolean forceLeader) {
+ this.localHandler = handler;
+ this.eventBus = eventBus;
+ this.helixManager = manager;
+ this.forceLeader = forceLeader;
+ }
+
+ @Override
+ public FlowExecution get(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+ return this.localHandler.get(key);
+ }
+
+ @Override
+ public List<FlowExecution> getLatestFlowExecution(PagingContext context, FlowId flowId, Integer count, String tag, String executionStatus) {
+ return this.localHandler.getLatestFlowExecution(context, flowId, count, tag, executionStatus);
+ }
+
+ @Override
+ public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ Long flowExecutionId = key.getKey().getFlowExecutionId();
+ if (this.forceLeader) {
+ HelixUtils.throwErrorIfNotLeader(this.helixManager);
+ }
+ this.eventBus.post(new KillFlowEvent(flowGroup, flowName, flowExecutionId));
+ return new UpdateResponse(HttpStatus.S_200_OK);
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
index f2c1c84..9c4efbc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java
@@ -17,23 +17,43 @@
package org.apache.gobblin.service.modules.utils;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.UUID;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.net.UnknownHostException;
+
import org.apache.helix.Criteria;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
+import org.slf4j.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.linkedin.data.DataMap;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.server.RestLiServiceException;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
-import org.slf4j.Logger;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
@Alpha
+@Slf4j
public class HelixUtils {
+ public static final String HELIX_INSTANCE_NAME_SEPARATOR = "@";
/***
* Build a Helix Manager (Helix Controller instance).
@@ -107,4 +127,60 @@ public class HelixUtils {
logger.error(String.format("Failed to send the %s message to the participants", message));
}
}
+
+ private static String getUrlFromHelixInstanceName(String helixInstanceName) {
+ if (!helixInstanceName.contains(HELIX_INSTANCE_NAME_SEPARATOR)) {
+ return null;
+ } else {
+ String url = helixInstanceName.substring(helixInstanceName.indexOf(HELIX_INSTANCE_NAME_SEPARATOR) + 1);
+ try {
+ return URLDecoder.decode(url, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Failed to decode URL from helix instance name", e);
+ }
+ }
+ }
+
+ private static String getLeaderUrl(HelixManager helixManager) {
+ PropertyKey key = helixManager.getHelixDataAccessor().keyBuilder().controllerLeader();
+ LiveInstance leader = helixManager.getHelixDataAccessor().getProperty(key);
+ return getUrlFromHelixInstanceName(leader.getInstanceName());
+ }
+
+ /**
+ * If this host is not the leader, throw a {@link RestLiServiceException}, and include the URL of the leader host in
+ * the message and in the errorDetails under the key {@link ServiceConfigKeys#LEADER_URL}.
+ */
+ public static void throwErrorIfNotLeader(Optional<HelixManager> helixManager) {
+ if (helixManager.isPresent() && !helixManager.get().isLeader()) {
+ String leaderUrl = getLeaderUrl(helixManager.get());
+ if (leaderUrl == null) {
+ throw new RuntimeException("Request sent to slave node but could not get leader node URL");
+ }
+ RestLiServiceException exception = new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "Request must be sent to leader node at URL " + leaderUrl);
+ exception.setErrorDetails(new DataMap(ImmutableMap.of(ServiceConfigKeys.LEADER_URL, leaderUrl)));
+ throw exception;
+ }
+ }
+
+ /**
+ * Build helix instance name by getting {@link org.apache.gobblin.service.ServiceConfigKeys#HELIX_INSTANCE_NAME_KEY}
+ * and appending the host, port, and service name with a separator
+ */
+ public static String buildHelixInstanceName(Config config, String defaultInstanceName) {
+ String helixInstanceName = ConfigUtils
+ .getString(config, ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY, defaultInstanceName);
+
+ String url = "";
+ try {
+ url = ConfigUtils.getString(config, ServiceConfigKeys.SERVICE_URL_PREFIX, "https://")
+ + InetAddress.getLocalHost().getHostName() + ":" + ConfigUtils.getString(config, ServiceConfigKeys.SERVICE_PORT, "")
+ + "/" + ConfigUtils.getString(config, ServiceConfigKeys.SERVICE_NAME, "");
+ url = HELIX_INSTANCE_NAME_SEPARATOR + URLEncoder.encode(url, "UTF-8");
+ } catch (UnknownHostException | UnsupportedEncodingException e) {
+ log.warn("Failed to construct helix instance name", e);
+ }
+
+ return helixInstanceName + url;
+ }
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
new file mode 100644
index 0000000..ce4cf71
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.linkedin.data.template.StringMap;
+import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.RestLiResponseException;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigClient;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PortUtils;
+
+
+@Test
+public class GobblinServiceRedirectTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(GobblinServiceRedirectTest.class);
+
+ private static final String QUARTZ_INSTANCE_NAME = "org.quartz.scheduler.instanceName";
+ private static final String QUARTZ_THREAD_POOL_COUNT = "org.quartz.threadPool.threadCount";
+
+ private static final File BASE_PATH1 = Files.createTempDir();
+ private static final String NODE_1_SERVICE_WORK_DIR = new Path(BASE_PATH1.getAbsolutePath(), "serviceWorkDirNode1").toString();
+ private static final String NODE_1_TOPOLOGY_SPEC_STORE_DIR = new Path(BASE_PATH1.getAbsolutePath(), "topologyTestSpecStoreNode1").toString();
+ private static final String NODE_1_FLOW_SPEC_STORE_DIR = new Path(BASE_PATH1.getAbsolutePath(), "flowTestSpecStore").toString();
+ private static final String NODE_1_JOB_STATUS_STATE_STORE_DIR = new Path(BASE_PATH1.getAbsolutePath(), "fsJobStatusRetriever").toString();
+
+ private static final File BASE_PATH2 = Files.createTempDir();
+ private static final String NODE_2_SERVICE_WORK_DIR = new Path(BASE_PATH2.getAbsolutePath(), "serviceWorkDirNode2").toString();
+ private static final String NODE_2_TOPOLOGY_SPEC_STORE_DIR = new Path(BASE_PATH2.getAbsolutePath(), "topologyTestSpecStoreNode2").toString();
+ private static final String NODE_2_FLOW_SPEC_STORE_DIR = new Path(BASE_PATH2.getAbsolutePath(), "flowTestSpecStore").toString();
+ private static final String NODE_2_JOB_STATUS_STATE_STORE_DIR = new Path(BASE_PATH2.getAbsolutePath(), "fsJobStatusRetriever").toString();
+
+ private static final String TEST_HELIX_CLUSTER_NAME = "testRedirectGobblinServiceCluster";
+
+ private static final String TEST_GROUP_NAME_1 = "testRedirectGroup1";
+ private static final String TEST_FLOW_NAME_1 = "testRedirectFlow1";
+ private static final String TEST_SCHEDULE_1 = "0 1/0 * ? * *";
+ private static final String TEST_TEMPLATE_URI_1 = "FS:///templates/test.template";
+
+ private static final String TEST_GROUP_NAME_2 = "testRedirectGroup2";
+ private static final String TEST_FLOW_NAME_2 = "testRedirectFlow2";
+ private static final String TEST_SCHEDULE_2 = "0 1/0 * ? * *";
+ private static final String TEST_TEMPLATE_URI_2 = "FS:///templates/test.template";
+
+ private static final String TEST_GOBBLIN_EXECUTOR_NAME = "testRedirectGobblinExecutor";
+ private static final String TEST_SOURCE_NAME = "testSource";
+ private static final String TEST_SINK_NAME = "testSink";
+
+ private String port1 = "10000";
+ private String port2 = "20000";
+
+ private static final String PREFIX = "https://";
+ private static final String SERVICE_NAME = "gobblinServiceTest";
+
+ private GobblinServiceManager node1GobblinServiceManager;
+ private FlowConfigClient node1FlowConfigClient;
+
+ private GobblinServiceManager node2GobblinServiceManager;
+ private FlowConfigClient node2FlowConfigClient;
+
+ private TestingServer testingZKServer;
+
+ private Properties node1ServiceCoreProperties;
+ private Properties node2ServiceCoreProperties;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ port1 = Integer.toString(new PortUtils.ServerSocketPortLocator().random());
+ port2 = Integer.toString(new PortUtils.ServerSocketPortLocator().random());
+
+ BASE_PATH1.deleteOnExit();
+ BASE_PATH2.deleteOnExit();
+
+ // Use a random ZK port
+ this.testingZKServer = new TestingServer(-1);
+ logger.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
+ HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(), TEST_HELIX_CLUSTER_NAME);
+
+ ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+ Properties commonServiceCoreProperties = new Properties();
+ commonServiceCoreProperties.put(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, testingZKServer.getConnectString());
+ commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY, TEST_HELIX_CLUSTER_NAME);
+ commonServiceCoreProperties.put(ServiceConfigKeys.HELIX_INSTANCE_NAME_KEY, "GaaS_" + UUID.randomUUID().toString());
+ commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY , TEST_GOBBLIN_EXECUTOR_NAME);
+ commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".description",
+ "StandaloneTestExecutor");
+ commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".version",
+ "1");
+ commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".uri",
+ "gobblinExecutor");
+ commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance",
+ "org.gobblin.service.InMemorySpecExecutor");
+ commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities",
+ TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
+ commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
+ commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
+ commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
+ commonServiceCoreProperties.put("zookeeper.connect", testingZKServer.getConnectString());
+ commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
+ commonServiceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
+
+ commonServiceCoreProperties.put(ServiceConfigKeys.FORCE_LEADER, true);
+ commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_URL_PREFIX, PREFIX);
+ commonServiceCoreProperties.put(ServiceConfigKeys.SERVICE_NAME, SERVICE_NAME);
+
+ node1ServiceCoreProperties = new Properties();
+ node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
+ node1ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_1_TOPOLOGY_SPEC_STORE_DIR);
+ node1ServiceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, NODE_1_FLOW_SPEC_STORE_DIR);
+ node1ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_1_JOB_STATUS_STATE_STORE_DIR);
+ node1ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "RedirectQuartzScheduler1");
+ node1ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
+ node1ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port1);
+
+ node2ServiceCoreProperties = new Properties();
+ node2ServiceCoreProperties.putAll(commonServiceCoreProperties);
+ node2ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_2_TOPOLOGY_SPEC_STORE_DIR);
+ node2ServiceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, NODE_2_FLOW_SPEC_STORE_DIR);
+ node2ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_2_JOB_STATUS_STATE_STORE_DIR);
+ node2ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "RedirectQuartzScheduler2");
+ node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
+ node2ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port2);
+
+ // Start Node 1
+ this.node1GobblinServiceManager = new GobblinServiceManager("RedirectCoreService1", "1",
+ ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
+ this.node1GobblinServiceManager.start();
+
+ // Start Node 2
+ this.node2GobblinServiceManager = new GobblinServiceManager("RedirectCoreService2", "2",
+ ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
+ this.node2GobblinServiceManager.start();
+
+ // Initialize Node 1 Client
+ Map<String, String> transportClientProperties = Maps.newHashMap();
+ transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, "10000");
+ this.node1FlowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/",
+ this.node1GobblinServiceManager.restliServer.getPort()), transportClientProperties);
+
+ // Initialize Node 2 Client
+ this.node2FlowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/",
+ this.node2GobblinServiceManager.restliServer.getPort()), transportClientProperties);
+ }
+
+ @AfterClass
+ public void cleanUp() throws Exception {
+ // Shutdown Node 1
+ try {
+ logger.info("+++++++++++++++++++ start shutdown noad1");
+ this.node1GobblinServiceManager.stop();
+ } catch (Exception e) {
+ logger.warn("Could not cleanly stop Node 1 of Gobblin Service", e);
+ }
+
+ // Shutdown Node 2
+ try {
+ logger.info("+++++++++++++++++++ start shutdown noad2");
+ this.node2GobblinServiceManager.stop();
+ } catch (Exception e) {
+ logger.warn("Could not cleanly stop Node 2 of Gobblin Service", e);
+ }
+
+ // Stop Zookeeper
+ try {
+ this.testingZKServer.close();
+ } catch (Exception e) {
+ logger.warn("Could not cleanly stop Testing Zookeeper", e);
+ }
+ }
+
+ @Test
+ public void testCreate() throws Exception {
+ Map<String, String> flowProperties = Maps.newHashMap();
+ flowProperties.put("param1", "value1");
+ flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
+ flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
+
+ FlowConfig flowConfig1 = new FlowConfig()
+ .setId(new FlowId().setFlowGroup(TEST_GROUP_NAME_1).setFlowName(TEST_FLOW_NAME_1))
+ .setTemplateUris(TEST_TEMPLATE_URI_1).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE_1).
+ setRunImmediately(true))
+ .setProperties(new StringMap(flowProperties));
+ FlowConfig flowConfig2 = new FlowConfig()
+ .setId(new FlowId().setFlowGroup(TEST_GROUP_NAME_2).setFlowName(TEST_FLOW_NAME_2))
+ .setTemplateUris(TEST_TEMPLATE_URI_2).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE_2).
+ setRunImmediately(true))
+ .setProperties(new StringMap(flowProperties));
+
+ GobblinServiceManager leader;
+ FlowConfigClient leaderClient;
+ FlowConfigClient slaveClient;
+ if (this.node1GobblinServiceManager.isLeader()) {
+ leader = this.node1GobblinServiceManager;
+ leaderClient = this.node1FlowConfigClient;
+ slaveClient = this.node2FlowConfigClient;
+ } else {
+ leader = this.node2GobblinServiceManager;
+ leaderClient = this.node2FlowConfigClient;
+ slaveClient = this.node1FlowConfigClient;
+ }
+
+ // Try create on leader, should be successful
+ leaderClient.createFlowConfig(flowConfig1);
+
+ // Try create on slave, should throw an error with leader URL
+ try {
+ slaveClient.createFlowConfig(flowConfig2);
+ } catch (RestLiResponseException e) {
+ Assert.assertTrue(e.hasErrorDetails());
+ Assert.assertTrue(e.getErrorDetails().containsKey(ServiceConfigKeys.LEADER_URL));
+ String expectedUrl = PREFIX + InetAddress.getLocalHost().getHostName() + ":" + leader.restliServer.getPort() + "/" + SERVICE_NAME;
+ Assert.assertEquals(e.getErrorDetails().get(ServiceConfigKeys.LEADER_URL), expectedUrl);
+ return;
+ }
+
+ throw new RuntimeException("Slave should have thrown an error");
+ }
+}
\ No newline at end of file