You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/01 22:23:28 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-840] avoid
creating a flow execution id by both master and slave gaas
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 98adee9 [GOBBLIN-840] avoid creating a flow execution id by both master and slave gaas
98adee9 is described below
commit 98adee9dd5d26dbf520c2c5d2e2cd3574eb134a2
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Aug 1 15:23:18 2019 -0700
[GOBBLIN-840] avoid creating a flow execution id by both master and slave gaas
Closes #2698 from
arjun4084346/deduplicateFlowExecutionId
---
.../gobblin/service/FlowConfigResourceLocalHandler.java | 13 ++++++++++++-
.../gobblin/service/FlowConfigV2ResourceLocalHandler.java | 4 +---
.../restli/GobblinServiceFlowConfigResourceHandler.java | 7 +++++++
3 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 19544ad..b20b5be 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -243,7 +243,18 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
// If it is not a run-once job, we should not add flow execution id here,
// because execution id is generated for every scheduled execution of the flow and cannot be materialized to
// the flow catalog. In this case, this id is added during flow compilation.
- configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, String.valueOf(System.currentTimeMillis()));
+ String flowExecutionId;
+ if (flowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ flowExecutionId = flowConfig.getProperties().get(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ // FLOW_EXECUTION_ID may already be present in FlowSpec in cases
+ // where the FlowSpec is forwarded by a slave to the master.
+ log.info("Using the existing flowExecutionId {} for {},{}", flowExecutionId, flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName());
+ } else {
+ flowExecutionId = String.valueOf(System.currentTimeMillis());
+ log.info("Created a flowExecutionId {} for {},{}", flowExecutionId, flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName());
+ }
+ flowConfig.getProperties().put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+ configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
}
if (flowConfig.hasExplain()) {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index effc415..9fcfec1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -20,13 +20,11 @@ import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
import com.linkedin.data.template.StringMap;
-import com.linkedin.data.transform.DataProcessingException;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateKVResponse;
import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.util.PatchApplier;
import lombok.extern.slf4j.Slf4j;
@@ -50,7 +48,7 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
this.createFlow.mark();
if (flowConfig.hasExplain()) {
- createLog += " explain " + Boolean.toString(flowConfig.isExplain());
+ createLog += " explain " + flowConfig.isExplain();
}
log.info(createLog);
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index d056fe1..2be2572 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -37,6 +37,7 @@ import com.linkedin.restli.server.util.PatchApplier;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigLoggedException;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
@@ -96,6 +97,12 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
String flowName = flowConfig.getId().getFlowName();
String flowGroup = flowConfig.getId().getFlowGroup();
+ if (flowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+ String.format("%s cannot be set by the user", ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
+ null);
+ }
+
checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, flowName, flowGroup);
try {