You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/03/29 15:31:16 UTC
[gobblin] branch master updated: [GOBBLIN-1804] Merge similar logic between `FlowConfig{,V2}ResourceLocalHandler.update` into single base class impl. (#3665)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bf3d21210 [GOBBLIN-1804] Merge similar logic between `FlowConfig{,V2}ResourceLocalHandler.update` into single base class impl. (#3665)
bf3d21210 is described below
commit bf3d21210bdb647e5dfab395fa35b2afa7317688
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Wed Mar 29 08:31:09 2023 -0700
[GOBBLIN-1804] Merge similar logic between `FlowConfig{,V2}ResourceLocalHandler.update` into single base class impl. (#3665)
* [GOBBLIN-1804] Merge similar logic between `FlowConfig{,V2}ResourceLocalHandler.update` into single base class impl.
* Correct `String.format` interpolation syntax, which differs from that of slf4j
---
.../service/FlowConfigResourceLocalHandler.java | 64 ++++++++++++++-
.../service/FlowConfigV2ResourceLocalHandler.java | 93 ----------------------
2 files changed, 60 insertions(+), 97 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 272a8eeab..6e7b9b082 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -19,13 +19,21 @@ package org.apache.gobblin.service;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
@@ -50,6 +58,7 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.util.ConfigUtils;
@@ -188,19 +197,27 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
flowConfig = originalFlowConfig;
}
+
+ FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
+ Map<String, AddSpecResponse> responseMap;
try {
- this.flowCatalog.update(createFlowSpecForConfig(flowConfig), triggerListener, modifiedWatermark);
+ responseMap = this.flowCatalog.update(flowSpec, triggerListener, modifiedWatermark);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
} catch (Throwable e) {
// TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
- log.warn(String.format("Failed to add flow configuration %s.%sto catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
+ log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
}
- return new UpdateResponse(HttpStatus.S_200_OK);
+
+ if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) {
+ return new UpdateResponse(HttpStatus.S_200_OK);
+ } else {
+ throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec));
+ }
}
- protected final boolean isUnscheduleRequest(FlowConfig flowConfig) {
+ private boolean isUnscheduleRequest(FlowConfig flowConfig) {
return Boolean.parseBoolean(flowConfig.getProperties().getOrDefault(ConfigurationKeys.FLOW_UNSCHEDULE_KEY, "false"));
}
@@ -305,4 +322,43 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getTemplateUris(), e);
}
}
+
+ protected String getErrorMessage(FlowSpec flowSpec) {
+ StringBuilder message = new StringBuilder("Flow was not compiled successfully.");
+ Map<String, ArrayList<String>> allErrors = new HashMap<>();
+
+ if (!flowSpec.getCompilationErrors().isEmpty()) {
+ message.append(" Compilation errors encountered (Sorted by relevance): ");
+ FlowSpec.CompilationError[] errors = flowSpec.getCompilationErrors().stream().distinct().toArray(FlowSpec.CompilationError[]::new);
+ Arrays.sort(errors, Comparator.comparingInt(c -> ((FlowSpec.CompilationError)c).errorPriority));
+ int errorIdSingleHop = 1;
+ int errorIdMultiHop = 1;
+
+ ArrayList<String> singleHopErrors = new ArrayList<>();
+ ArrayList<String> multiHopErrors = new ArrayList<>();
+
+ for (FlowSpec.CompilationError error: errors) {
+ if (error.errorPriority == 0) {
+ singleHopErrors.add(String.format("ERROR %s of single-step data movement: ", errorIdSingleHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
+ errorIdSingleHop++;
+ } else {
+ multiHopErrors.add(String.format("ERROR %s of multi-step data movement: ", errorIdMultiHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
+ errorIdMultiHop++;
+ }
+ }
+
+ allErrors.put("singleHopErrors", singleHopErrors);
+ allErrors.put("multiHopErrors", multiHopErrors);
+ }
+
+ allErrors.put("message", new ArrayList<>(Collections.singletonList(message.toString())));
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ return mapper.writeValueAsString(allErrors);
+ } catch (JsonProcessingException e) {
+ log.error(String.format("FlowSpec %s errored on Json processing", flowSpec.toString()), e);
+ }
+ return "Could not form JSON in " + getClass().getSimpleName();
+ }
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 791721f60..cd1447ad5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -16,13 +16,6 @@
*/
package org.apache.gobblin.service;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Hashtable;
import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
@@ -115,92 +108,6 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
return new CreateKVResponse<>(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, httpStatus);
}
- private String getErrorMessage(FlowSpec flowSpec) {
- StringBuilder message = new StringBuilder("Flow was not compiled successfully.");
- Hashtable<String, ArrayList<String>> allErrors = new Hashtable<>();
-
- if (!flowSpec.getCompilationErrors().isEmpty()) {
- message.append(" Compilation errors encountered (Sorted by relevance): ");
- FlowSpec.CompilationError[] errors = flowSpec.getCompilationErrors().stream().distinct().toArray(FlowSpec.CompilationError[]::new);
- Arrays.sort(errors, Comparator.comparingInt(c -> ((FlowSpec.CompilationError)c).errorPriority));
- int errorIdSingleHop = 1;
- int errorIdMultiHop = 1;
-
- ArrayList<String> singleHopErrors = new ArrayList<>();
- ArrayList<String> multiHopErrors = new ArrayList<>();
-
- for (FlowSpec.CompilationError error: errors) {
- if (error.errorPriority == 0) {
- singleHopErrors.add(String.format("ERROR %s of single-step data movement: ", errorIdSingleHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
- errorIdSingleHop++;
- } else {
- multiHopErrors.add(String.format("ERROR %s of multi-step data movement: ", errorIdMultiHop) + error.errorMessage.replace("\n", " ").replace("\t", ""));
- errorIdMultiHop++;
- }
- }
-
- allErrors.put("singleHopErrors", singleHopErrors);
- allErrors.put("multiHopErrors", multiHopErrors);
- }
-
- allErrors.put("message", new ArrayList<>(Collections.singletonList(message.toString())));
- ObjectMapper mapper = new ObjectMapper();
-
- try {
- return mapper.writeValueAsString(allErrors);
- } catch (JsonProcessingException e) {
- log.error("Flow Spec {} errored on Json processing", flowSpec.toString(), e);
- e.printStackTrace();
- }
- return "Could not form JSON in FlowConfigV2ResourceLocalHandler";
- }
-
- /**
- * Update flowConfig locally and trigger all listeners iff @param triggerListener is set to true
- */
- @Override
- public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boolean triggerListener, long modifiedWatermark) {
- log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
-
- if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || !flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
- throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
- "flowName and flowGroup cannot be changed in update", null);
- }
-
- FlowConfig originalFlowConfig = getFlowConfig(flowId);
-
- if (!flowConfig.getProperties().containsKey(RequesterService.REQUESTER_LIST)) {
- // Carry forward the requester list property if it is not being updated since it was added at time of creation
- flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST));
- }
-
- if (isUnscheduleRequest(flowConfig)) {
- // flow config is not changed if it is just a request to un-schedule
- originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
- flowConfig = originalFlowConfig;
- }
-
- FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
- Map<String, AddSpecResponse> responseMap;
- try {
- responseMap = this.flowCatalog.update(flowSpec, triggerListener, modifiedWatermark);
- } catch (QuotaExceededException e) {
- throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
- } catch (Throwable e) {
- // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
- log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
- throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
- if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) {
- return new UpdateResponse(HttpStatus.S_200_OK);
- } else {
- throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec));
- }
- }
-
-
-
/**
* Note: this method is only implemented for testing, normally partial update would be called in
* GobblinServiceFlowConfigResourceHandler.partialUpdateFlowConfig