You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/01 22:23:28 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-840] avoid creating a flow execution id by both master and slave gaas

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 98adee9  [GOBBLIN-840] avoid creating a flow execution id by both master and slave gaas
98adee9 is described below

commit 98adee9dd5d26dbf520c2c5d2e2cd3574eb134a2
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Aug 1 15:23:18 2019 -0700

    [GOBBLIN-840] avoid creating a flow execution id by both master and slave gaas
    
    Closes #2698 from
    arjun4084346/deduplicateFlowExecutionId
---
 .../gobblin/service/FlowConfigResourceLocalHandler.java     | 13 ++++++++++++-
 .../gobblin/service/FlowConfigV2ResourceLocalHandler.java   |  4 +---
 .../restli/GobblinServiceFlowConfigResourceHandler.java     |  7 +++++++
 3 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 19544ad..b20b5be 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -243,7 +243,18 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       // If it is not a run-once job, we should not add flow execution id here,
       // because execution id is generated for every scheduled execution of the flow and cannot be materialized to
       // the flow catalog. In this case, this id is added during flow compilation.
-      configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, String.valueOf(System.currentTimeMillis()));
+      String flowExecutionId;
+      if (flowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+        flowExecutionId = flowConfig.getProperties().get(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+        // FLOW_EXECUTION_ID may already be present in FlowSpec in cases
+        // where the FlowSpec is forwarded by a slave to the master.
+        log.info("Using the existing flowExecutionId {} for {},{}", flowExecutionId, flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName());
+      } else {
+        flowExecutionId = String.valueOf(System.currentTimeMillis());
+        log.info("Created a flowExecutionId {} for {},{}", flowExecutionId, flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName());
+      }
+      flowConfig.getProperties().put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+      configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
     }
 
     if (flowConfig.hasExplain()) {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index effc415..9fcfec1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -20,13 +20,11 @@ import java.util.Map;
 import org.apache.commons.lang3.StringEscapeUtils;
 
 import com.linkedin.data.template.StringMap;
-import com.linkedin.data.transform.DataProcessingException;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.util.PatchApplier;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -50,7 +48,7 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
     this.createFlow.mark();
 
     if (flowConfig.hasExplain()) {
-      createLog += " explain " + Boolean.toString(flowConfig.isExplain());
+      createLog += " explain " + flowConfig.isExplain();
     }
     log.info(createLog);
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index d056fe1..2be2572 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -37,6 +37,7 @@ import com.linkedin.restli.server.util.PatchApplier;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigLoggedException;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
@@ -96,6 +97,12 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
     String flowName = flowConfig.getId().getFlowName();
     String flowGroup = flowConfig.getId().getFlowGroup();
 
+    if (flowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+          String.format("%s cannot be set by the user", ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
+          null);
+    }
+
     checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, flowName, flowGroup);
 
     try {