You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/03/29 15:31:16 UTC

[gobblin] branch master updated: [GOBBLIN-1804] Merge similar logic between `FlowConfig{,V2}ResourceLocalHandler.update` into single base class impl. (#3665)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bf3d21210 [GOBBLIN-1804] Merge similar logic between `FlowConfig{,V2}ResourceLocalHandler.update` into single base class impl. (#3665)
bf3d21210 is described below

commit bf3d21210bdb647e5dfab395fa35b2afa7317688
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Wed Mar 29 08:31:09 2023 -0700

    [GOBBLIN-1804] Merge similar logic between `FlowConfig{,V2}ResourceLocalHandler.update` into single base class impl. (#3665)
    
    * [GOBBLIN-1804] Merge similar logic between `FlowConfig{,V2}ResourceLocalHandler.update` into single base class impl.
    
    * Correct `String.format` interpolation syntax, which differs from that of slf4j
---
 .../service/FlowConfigResourceLocalHandler.java    | 64 ++++++++++++++-
 .../service/FlowConfigV2ResourceLocalHandler.java  | 93 ----------------------
 2 files changed, 60 insertions(+), 97 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 272a8eeab..6e7b9b082 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -19,13 +19,21 @@ package org.apache.gobblin.service;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang.StringUtils;
 
 import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
@@ -50,6 +58,7 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -188,19 +197,27 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
       flowConfig = originalFlowConfig;
     }
+
+    FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
+    Map<String, AddSpecResponse> responseMap;
     try {
-      this.flowCatalog.update(createFlowSpecForConfig(flowConfig), triggerListener, modifiedWatermark);
+      responseMap = this.flowCatalog.update(flowSpec, triggerListener, modifiedWatermark);
     } catch (QuotaExceededException e) {
       throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
     } catch (Throwable e) {
       // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
-      log.warn(String.format("Failed to add flow configuration %s.%sto catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
+      log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
       throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
     }
-    return new UpdateResponse(HttpStatus.S_200_OK);
+
+    if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) {
+      return new UpdateResponse(HttpStatus.S_200_OK);
+    } else {
+      throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec));
+    }
   }
 
-  protected final boolean isUnscheduleRequest(FlowConfig flowConfig) {
+  private boolean isUnscheduleRequest(FlowConfig flowConfig) {
     return Boolean.parseBoolean(flowConfig.getProperties().getOrDefault(ConfigurationKeys.FLOW_UNSCHEDULE_KEY, "false"));
   }
 
@@ -305,4 +322,43 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getTemplateUris(), e);
     }
   }
+
+  protected String getErrorMessage(FlowSpec flowSpec) {
+    StringBuilder message = new StringBuilder("Flow was not compiled successfully.");
+    Map<String, ArrayList<String>> allErrors = new HashMap<>();
+
+    if (!flowSpec.getCompilationErrors().isEmpty()) {
+      message.append(" Compilation errors encountered (Sorted by relevance): ");
+      FlowSpec.CompilationError[] errors = flowSpec.getCompilationErrors().stream().distinct().toArray(FlowSpec.CompilationError[]::new);
+      Arrays.sort(errors, Comparator.comparingInt(c -> ((FlowSpec.CompilationError)c).errorPriority));
+      int errorIdSingleHop = 1;
+      int errorIdMultiHop = 1;
+
+      ArrayList<String> singleHopErrors = new ArrayList<>();
+      ArrayList<String> multiHopErrors = new ArrayList<>();
+
+      for (FlowSpec.CompilationError error: errors) {
+        if (error.errorPriority == 0) {
+          singleHopErrors.add(String.format("ERROR %s of single-step data movement: ", errorIdSingleHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
+          errorIdSingleHop++;
+        } else {
+          multiHopErrors.add(String.format("ERROR %s of multi-step data movement: ", errorIdMultiHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
+          errorIdMultiHop++;
+        }
+      }
+
+      allErrors.put("singleHopErrors", singleHopErrors);
+      allErrors.put("multiHopErrors", multiHopErrors);
+    }
+
+    allErrors.put("message", new ArrayList<>(Collections.singletonList(message.toString())));
+    ObjectMapper mapper = new ObjectMapper();
+
+    try {
+      return mapper.writeValueAsString(allErrors);
+    } catch (JsonProcessingException e) {
+      log.error(String.format("FlowSpec %s errored on Json processing", flowSpec.toString()), e);
+    }
+    return "Could not form JSON in " + getClass().getSimpleName();
+  }
 }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 791721f60..cd1447ad5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -16,13 +16,6 @@
  */
 package org.apache.gobblin.service;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Hashtable;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringEscapeUtils;
@@ -115,92 +108,6 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
     return new CreateKVResponse<>(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, httpStatus);
   }
 
-  private String getErrorMessage(FlowSpec flowSpec) {
-    StringBuilder message = new StringBuilder("Flow was not compiled successfully.");
-    Hashtable<String, ArrayList<String>> allErrors = new Hashtable<>();
-
-    if (!flowSpec.getCompilationErrors().isEmpty()) {
-      message.append(" Compilation errors encountered (Sorted by relevance): ");
-      FlowSpec.CompilationError[] errors = flowSpec.getCompilationErrors().stream().distinct().toArray(FlowSpec.CompilationError[]::new);
-      Arrays.sort(errors, Comparator.comparingInt(c -> ((FlowSpec.CompilationError)c).errorPriority));
-      int errorIdSingleHop = 1;
-      int errorIdMultiHop = 1;
-
-      ArrayList<String> singleHopErrors = new ArrayList<>();
-      ArrayList<String> multiHopErrors = new ArrayList<>();
-
-      for (FlowSpec.CompilationError error: errors) {
-        if (error.errorPriority == 0) {
-          singleHopErrors.add(String.format("ERROR %s of single-step data movement: ", errorIdSingleHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
-          errorIdSingleHop++;
-        } else {
-          multiHopErrors.add(String.format("ERROR %s of multi-step data movement: ", errorIdMultiHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
-          errorIdMultiHop++;
-        }
-      }
-
-      allErrors.put("singleHopErrors", singleHopErrors);
-      allErrors.put("multiHopErrors", multiHopErrors);
-    }
-
-    allErrors.put("message", new ArrayList<>(Collections.singletonList(message.toString())));
-    ObjectMapper mapper = new ObjectMapper();
-
-    try {
-      return mapper.writeValueAsString(allErrors);
-    } catch (JsonProcessingException e) {
-      log.error("Flow Spec {} errored on Json processing", flowSpec.toString(), e);
-      e.printStackTrace();
-    }
-    return "Could not form JSON in FlowConfigV2ResourceLocalHandler";
-  }
-
-  /**
-   * Update flowConfig locally and trigger all listeners iff @param triggerListener is set to true
-   */
-  @Override
-  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boolean triggerListener, long modifiedWatermark) {
-    log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
-
-    if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || !flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
-      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
-          "flowName and flowGroup cannot be changed in update", null);
-    }
-
-    FlowConfig originalFlowConfig = getFlowConfig(flowId);
-
-    if (!flowConfig.getProperties().containsKey(RequesterService.REQUESTER_LIST)) {
-      // Carry forward the requester list property if it is not being updated since it was added at time of creation
-      flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST));
-    }
-
-    if (isUnscheduleRequest(flowConfig)) {
-      // flow config is not changed if it is just a request to un-schedule
-      originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
-      flowConfig = originalFlowConfig;
-    }
-
-    FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
-    Map<String, AddSpecResponse> responseMap;
-    try {
-      responseMap = this.flowCatalog.update(flowSpec, triggerListener, modifiedWatermark);
-    } catch (QuotaExceededException e) {
-      throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
-    } catch (Throwable e) {
-      // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
-      log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
-      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
-    }
-
-    if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) {
-      return new UpdateResponse(HttpStatus.S_200_OK);
-    } else {
-      throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec));
-    }
-  }
-
-
-
   /**
    * Note: this method is only implemented for testing, normally partial update would be called in
    * GobblinServiceFlowConfigResourceHandler.partialUpdateFlowConfig