You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/03/24 20:42:37 UTC

[gobblin] branch master updated: [GOBBLIN-1804] Reject flow config updates that would fail compilation by returning service error (#3664)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 549462f4a [GOBBLIN-1804] Reject flow config updates that would fail compilation by returning service error (#3664)
549462f4a is described below

commit 549462f4a90c31f84e7a2aab6b8aee144413a7ce
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Fri Mar 24 13:42:29 2023 -0700

    [GOBBLIN-1804] Reject flow config updates that would fail compilation by returning service error (#3664)
---
 .../apache/gobblin/service/FlowConfigV2Test.java   | 85 +++++++++++++++++++++-
 .../service/FlowConfigResourceLocalHandler.java    |  2 +-
 .../service/FlowConfigV2ResourceLocalHandler.java  | 47 ++++++++++++
 3 files changed, 130 insertions(+), 4 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index bc16e3bad..b205ab244 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -20,10 +20,12 @@ package org.apache.gobblin.service;
 import java.io.File;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.mortbay.jetty.HttpStatus;
+import org.mockito.ArgumentMatchers;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -31,6 +33,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
@@ -52,6 +55,7 @@ import lombok.Setter;
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -70,6 +74,7 @@ public class FlowConfigV2Test {
   private TestRequesterService _requesterService;
   private GroupOwnershipService groupOwnershipService;
   private File groupConfigFile;
+  private Set<String> _compilationFailureFlowPaths = Sets.newHashSet();
 
   private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
   private static final String TEST_GROUP_NAME = "testGroup1";
@@ -82,6 +87,8 @@ public class FlowConfigV2Test {
   private static final String TEST_FLOW_NAME_7 = "testFlow7";
   private static final String TEST_FLOW_NAME_8 = "testFlow8";
   private static final String TEST_FLOW_NAME_9 = "testFlow9";
+  private static final String TEST_FLOW_NAME_10 = "testFlow10";
+  private static final String TEST_FLOW_NAME_11 = "testFlow11";
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
 
@@ -103,7 +110,11 @@ public class FlowConfigV2Test {
     final FlowCatalog flowCatalog = new FlowCatalog(config);
     final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
     when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
-    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    // NOTE: more general `ArgumentMatchers` (indicating compilation unsuccessful) must precede the specific
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(null));
+    when(mockListener.onAddSpec(ArgumentMatchers.argThat((FlowSpec flowSpec) -> {
+      return !_compilationFailureFlowPaths.contains(flowSpec.getUri().getPath());
+    }))).thenReturn(new AddSpecResponse(""));
     flowCatalog.addListener(mockListener);
     flowCatalog.startAsync();
     flowCatalog.awaitRunning();
@@ -168,6 +179,31 @@ public class FlowConfigV2Test {
     Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(), -1L);
   }
 
+  @Test
+  public void testCreateRejectedWhenFailsCompilation() throws Exception {
+    FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_10);
+    _requesterService.setRequester(TEST_REQUESTER);
+
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+    flowProperties.put("param2", "value2");
+    flowProperties.put("param3", "value3");
+
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_10))
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+
+    // inform mock that this flow should fail compilation
+    _compilationFailureFlowPaths.add(String.format("/%s/%s", TEST_GROUP_NAME, TEST_FLOW_NAME_10));
+    try {
+      _client.createFlowConfig(flowConfig);
+      Assert.fail("create seemingly accepted (despite anticipated flow compilation failure)");
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_400_Bad_Request);
+      Assert.assertTrue(e.getMessage().contains("Flow was not compiled successfully."));
+    }
+  }
+
   @Test
   public void testPartialUpdate() throws Exception {
     FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_3);
@@ -202,7 +238,7 @@ public class FlowConfigV2Test {
   }
 
   @Test (expectedExceptions = RestLiResponseException.class)
-  public void testBadPartialUpdate() throws Exception {
+  public void testPartialUpdateNotPossibleWithoutCreateFirst() throws Exception {
     FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
 
     String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
@@ -210,10 +246,53 @@ public class FlowConfigV2Test {
     DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
     PatchRequest<FlowConfig> flowConfigPatch = PatchRequest.createFromPatchDocument(dataMap);
 
-    // Throws exception since local handlers don't support partial update
+    // Throws exception since flow was not created first, prior to partial update
     _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
   }
 
+  @Test
+  public void testPartialUpdateRejectedWhenFailsCompilation() throws Exception {
+    FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_11);
+    _requesterService.setRequester(TEST_REQUESTER);
+
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+    flowProperties.put("param2", "value2");
+    flowProperties.put("param3", "value3");
+
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_11))
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+
+    // Set some initial config
+    _client.createFlowConfig(flowConfig);
+
+    // Change param2 to value4, delete param3, add param5=value5
+    String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
+        + "\"properties\":{\"$set\":{\"param2\":\"value4\",\"param5\":\"value5\"},\"$delete\":[\"param3\"]}}";
+    DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+    PatchRequest<FlowConfig> flowConfigPatch = PatchRequest.createFromPatchDocument(dataMap);
+
+    // inform mock that this flow should hereafter fail compilation
+    _compilationFailureFlowPaths.add(String.format("/%s/%s", TEST_GROUP_NAME, TEST_FLOW_NAME_11));
+    try {
+      _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
+      Assert.fail("update seemingly accepted (despite anticipated flow compilation failure)");
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_400_Bad_Request);
+      Assert.assertTrue(e.getMessage().contains("Flow was not compiled successfully."));
+    }
+
+    // verify that prior state of flow config still retained: that updates had no effect
+    FlowConfig retrievedFlowConfig = _client.getFlowConfig(flowId);
+
+    Assert.assertTrue(!retrievedFlowConfig.getSchedule().isRunImmediately());
+    Assert.assertEquals(retrievedFlowConfig.getProperties().get("param1"), "value1");
+    Assert.assertEquals(retrievedFlowConfig.getProperties().get("param2"), "value2");
+    Assert.assertEquals(retrievedFlowConfig.getProperties().get("param3"), "value3");
+    Assert.assertFalse(retrievedFlowConfig.getProperties().containsKey("param5"));
+  }
+
   @Test
   public void testDisallowedRequester() throws Exception {
     try {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index f97b1b915..272a8eeab 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -200,7 +200,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     return new UpdateResponse(HttpStatus.S_200_OK);
   }
 
-  private boolean isUnscheduleRequest(FlowConfig flowConfig) {
+  protected final boolean isUnscheduleRequest(FlowConfig flowConfig) {
     return Boolean.parseBoolean(flowConfig.getProperties().getOrDefault(ConfigurationKeys.FLOW_UNSCHEDULE_KEY, "false"));
   }
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index b20b83346..791721f60 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -154,6 +154,53 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
     }
     return "Could not form JSON in FlowConfigV2ResourceLocalHandler";
   }
+
+  /**
+   * Update flowConfig locally and trigger all listeners iff @param triggerListener is set to true
+   */
+  @Override
+  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boolean triggerListener, long modifiedWatermark) {
+    log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
+
+    if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || !flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+          "flowName and flowGroup cannot be changed in update", null);
+    }
+
+    FlowConfig originalFlowConfig = getFlowConfig(flowId);
+
+    if (!flowConfig.getProperties().containsKey(RequesterService.REQUESTER_LIST)) {
+      // Carry forward the requester list property if it is not being updated since it was added at time of creation
+      flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST));
+    }
+
+    if (isUnscheduleRequest(flowConfig)) {
+      // flow config is not changed if it is just a request to un-schedule
+      originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
+      flowConfig = originalFlowConfig;
+    }
+
+    FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
+    Map<String, AddSpecResponse> responseMap;
+    try {
+      responseMap = this.flowCatalog.update(flowSpec, triggerListener, modifiedWatermark);
+    } catch (QuotaExceededException e) {
+      throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
+    } catch (Throwable e) {
+      // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
+      log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
+      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+    }
+
+    if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) {
+      return new UpdateResponse(HttpStatus.S_200_OK);
+    } else {
+      throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec));
+    }
+  }
+
+
+
   /**
    * Note: this method is only implemented for testing, normally partial update would be called in
    * GobblinServiceFlowConfigResourceHandler.partialUpdateFlowConfig