You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/03/24 20:42:37 UTC
[gobblin] branch master updated: [GOBBLIN-1804] Reject flow config updates that would fail compilation by returning service error (#3664)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 549462f4a [GOBBLIN-1804] Reject flow config updates that would fail compilation by returning service error (#3664)
549462f4a is described below
commit 549462f4a90c31f84e7a2aab6b8aee144413a7ce
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Fri Mar 24 13:42:29 2023 -0700
[GOBBLIN-1804] Reject flow config updates that would fail compilation by returning service error (#3664)
---
.../apache/gobblin/service/FlowConfigV2Test.java | 85 +++++++++++++++++++++-
.../service/FlowConfigResourceLocalHandler.java | 2 +-
.../service/FlowConfigV2ResourceLocalHandler.java | 47 ++++++++++++
3 files changed, 130 insertions(+), 4 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index bc16e3bad..b205ab244 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -20,10 +20,12 @@ package org.apache.gobblin.service;
import java.io.File;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.mortbay.jetty.HttpStatus;
+import org.mockito.ArgumentMatchers;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -31,6 +33,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Guice;
@@ -52,6 +55,7 @@ import lombok.Setter;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -70,6 +74,7 @@ public class FlowConfigV2Test {
private TestRequesterService _requesterService;
private GroupOwnershipService groupOwnershipService;
private File groupConfigFile;
+ private Set<String> _compilationFailureFlowPaths = Sets.newHashSet();
private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
private static final String TEST_GROUP_NAME = "testGroup1";
@@ -82,6 +87,8 @@ public class FlowConfigV2Test {
private static final String TEST_FLOW_NAME_7 = "testFlow7";
private static final String TEST_FLOW_NAME_8 = "testFlow8";
private static final String TEST_FLOW_NAME_9 = "testFlow9";
+ private static final String TEST_FLOW_NAME_10 = "testFlow10";
+ private static final String TEST_FLOW_NAME_11 = "testFlow11";
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
@@ -103,7 +110,11 @@ public class FlowConfigV2Test {
final FlowCatalog flowCatalog = new FlowCatalog(config);
final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
- when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ // NOTE: more general `ArgumentMatchers` (indicating compilation unsuccessful) must precede the specific
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(null));
+ when(mockListener.onAddSpec(ArgumentMatchers.argThat((FlowSpec flowSpec) -> {
+ return !_compilationFailureFlowPaths.contains(flowSpec.getUri().getPath());
+ }))).thenReturn(new AddSpecResponse(""));
flowCatalog.addListener(mockListener);
flowCatalog.startAsync();
flowCatalog.awaitRunning();
@@ -168,6 +179,31 @@ public class FlowConfigV2Test {
Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(), -1L);
}
+ @Test
+ public void testCreateRejectedWhenFailsCompilation() throws Exception {
+ FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_10);
+ _requesterService.setRequester(TEST_REQUESTER);
+
+ Map<String, String> flowProperties = Maps.newHashMap();
+ flowProperties.put("param1", "value1");
+ flowProperties.put("param2", "value2");
+ flowProperties.put("param3", "value3");
+
+ FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_10))
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+
+ // inform mock that this flow should fail compilation
+ _compilationFailureFlowPaths.add(String.format("/%s/%s", TEST_GROUP_NAME, TEST_FLOW_NAME_10));
+ try {
+ _client.createFlowConfig(flowConfig);
+ Assert.fail("create seemingly accepted (despite anticipated flow compilation failure)");
+ } catch (RestLiResponseException e) {
+ Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_400_Bad_Request);
+ Assert.assertTrue(e.getMessage().contains("Flow was not compiled successfully."));
+ }
+ }
+
@Test
public void testPartialUpdate() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_3);
@@ -202,7 +238,7 @@ public class FlowConfigV2Test {
}
@Test (expectedExceptions = RestLiResponseException.class)
- public void testBadPartialUpdate() throws Exception {
+ public void testPartialUpdateNotPossibleWithoutCreateFirst() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
@@ -210,10 +246,53 @@ public class FlowConfigV2Test {
DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
PatchRequest<FlowConfig> flowConfigPatch = PatchRequest.createFromPatchDocument(dataMap);
- // Throws exception since local handlers don't support partial update
+ // Throws exception since flow was not created first, prior to partial update
_client.partialUpdateFlowConfig(flowId, flowConfigPatch);
}
+ @Test
+ public void testPartialUpdateRejectedWhenFailsCompilation() throws Exception {
+ FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_11);
+ _requesterService.setRequester(TEST_REQUESTER);
+
+ Map<String, String> flowProperties = Maps.newHashMap();
+ flowProperties.put("param1", "value1");
+ flowProperties.put("param2", "value2");
+ flowProperties.put("param3", "value3");
+
+ FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_11))
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+ .setProperties(new StringMap(flowProperties));
+
+ // Set some initial config
+ _client.createFlowConfig(flowConfig);
+
+ // Change param2 to value4, delete param3, add param5=value5
+ String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
+ + "\"properties\":{\"$set\":{\"param2\":\"value4\",\"param5\":\"value5\"},\"$delete\":[\"param3\"]}}";
+ DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+ PatchRequest<FlowConfig> flowConfigPatch = PatchRequest.createFromPatchDocument(dataMap);
+
+ // inform mock that this flow should hereafter fail compilation
+ _compilationFailureFlowPaths.add(String.format("/%s/%s", TEST_GROUP_NAME, TEST_FLOW_NAME_11));
+ try {
+ _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
+ Assert.fail("update seemingly accepted (despite anticipated flow compilation failure)");
+ } catch (RestLiResponseException e) {
+ Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_400_Bad_Request);
+ Assert.assertTrue(e.getMessage().contains("Flow was not compiled successfully."));
+ }
+
+ // verify that prior state of flow config still retained: that updates had no effect
+ FlowConfig retrievedFlowConfig = _client.getFlowConfig(flowId);
+
+ Assert.assertTrue(!retrievedFlowConfig.getSchedule().isRunImmediately());
+ Assert.assertEquals(retrievedFlowConfig.getProperties().get("param1"), "value1");
+ Assert.assertEquals(retrievedFlowConfig.getProperties().get("param2"), "value2");
+ Assert.assertEquals(retrievedFlowConfig.getProperties().get("param3"), "value3");
+ Assert.assertFalse(retrievedFlowConfig.getProperties().containsKey("param5"));
+ }
+
@Test
public void testDisallowedRequester() throws Exception {
try {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index f97b1b915..272a8eeab 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -200,7 +200,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
return new UpdateResponse(HttpStatus.S_200_OK);
}
- private boolean isUnscheduleRequest(FlowConfig flowConfig) {
+ protected final boolean isUnscheduleRequest(FlowConfig flowConfig) {
return Boolean.parseBoolean(flowConfig.getProperties().getOrDefault(ConfigurationKeys.FLOW_UNSCHEDULE_KEY, "false"));
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index b20b83346..791721f60 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -154,6 +154,53 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
}
return "Could not form JSON in FlowConfigV2ResourceLocalHandler";
}
+
+ /**
+ * Update flowConfig locally and trigger all listeners iff @param triggerListener is set to true
+ */
+ @Override
+ public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boolean triggerListener, long modifiedWatermark) {
+ log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
+
+ if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || !flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
+ throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+ "flowName and flowGroup cannot be changed in update", null);
+ }
+
+ FlowConfig originalFlowConfig = getFlowConfig(flowId);
+
+ if (!flowConfig.getProperties().containsKey(RequesterService.REQUESTER_LIST)) {
+ // Carry forward the requester list property if it is not being updated since it was added at time of creation
+ flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST));
+ }
+
+ if (isUnscheduleRequest(flowConfig)) {
+ // flow config is not changed if it is just a request to un-schedule
+ originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
+ flowConfig = originalFlowConfig;
+ }
+
+ FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
+ Map<String, AddSpecResponse> responseMap;
+ try {
+ responseMap = this.flowCatalog.update(flowSpec, triggerListener, modifiedWatermark);
+ } catch (QuotaExceededException e) {
+ throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
+ } catch (Throwable e) {
+ // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
+ log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
+ throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) {
+ return new UpdateResponse(HttpStatus.S_200_OK);
+ } else {
+ throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec));
+ }
+ }
+
+
+
/**
* Note: this method is only implemented for testing, normally partial update would be called in
* GobblinServiceFlowConfigResourceHandler.partialUpdateFlowConfig