You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/08/10 02:03:53 UTC

[GitHub] [gobblin] aplex commented on a change in pull request #3357: [GOBBLIN-1509] Announce flow failure on DagManager::addDag error

aplex commented on a change in pull request #3357:
URL: https://github.com/apache/gobblin/pull/3357#discussion_r685634849



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -288,13 +287,28 @@ public void orchestrate(Spec spec) throws Exception {
       flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
           jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
 
-      if (flowCompilationTimer != null) {
-        flowCompilationTimer.stop(flowMetadata);
+      if (flowCompilationTimer.isPresent()) {
+        flowCompilationTimer.get().stop(flowMetadata);
       }
 
       if (this.dagManager.isPresent()) {
-        //Send the dag to the DagManager.
-        this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+        try {
+          //Send the dag to the DagManager.
+          this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+        } catch (Exception ex) {
+          // pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover)
+          String context = String.format("flow: %s; exec: %s",
+              DagManagerUtils.getFlowId(jobExecutionPlanDag).toString(),
+              DagManagerUtils.getFlowExecId(jobExecutionPlanDag));
+          String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
+          _log.warn(String.format("[%s] %s", context, failureMessage));

Review comment:
       I think there is an overload of "warn" that accepts exception argument. It's better to use it instead of having exception in message. It will produce nicer log messages, and also put stack traces in separate fields when we are forwarding logs into Kafka.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -288,13 +287,28 @@ public void orchestrate(Spec spec) throws Exception {
       flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
           jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
 
-      if (flowCompilationTimer != null) {
-        flowCompilationTimer.stop(flowMetadata);
+      if (flowCompilationTimer.isPresent()) {
+        flowCompilationTimer.get().stop(flowMetadata);
       }
 
       if (this.dagManager.isPresent()) {
-        //Send the dag to the DagManager.
-        this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+        try {
+          //Send the dag to the DagManager.
+          this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+        } catch (Exception ex) {

Review comment:
       How was this error handled previously? I would expect that it was logged somewhere. I wonder if we'll get two errors in logs for the same problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org