You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/07/15 19:35:19 UTC

[gobblin] branch master updated: [GOBBLIN-1458] Add endpoint to trigger a flow in GaaS (#3297)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 77c2b95  [GOBBLIN-1458] Add endpoint to trigger a flow in GaaS (#3297)
77c2b95 is described below

commit 77c2b95b2a106e83011eade119f88e900a07fa11
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Jul 15 12:35:13 2021 -0700

    [GOBBLIN-1458] Add endpoint to trigger a flow in GaaS (#3297)
    
    Previously the way to manually trigger an execution of a scheduled flow is to send a partial update that sets the flow's runImmediately property to true. This PR improves the API by adding a restli action that sends the same partial update.
    
    Also copied the implementation of partialUpdateFlowConfig to FlowConfigV2ResourceLocalHandler for testing convenience (normally when running the service GobblinServiceFlowConfigResourceHandler would be used, but for tests it is FlowConfigV2ResourceLocalHandler).
---
 ...che.gobblin.service.flowconfigsV2.restspec.json |  7 ++-
 ...che.gobblin.service.flowconfigsV2.snapshot.json |  7 ++-
 .../apache/gobblin/service/FlowConfigV2Client.java | 14 ++++++
 .../apache/gobblin/service/FlowConfigV2Test.java   | 51 +++++++++++++++-------
 .../service/FlowConfigV2ResourceLocalHandler.java  | 16 ++++++-
 .../gobblin/service/FlowConfigsV2Resource.java     | 22 ++++++++++
 6 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index 5a7ccb8..06f5727 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -79,7 +79,12 @@
       } ]
     } ],
     "entity" : {
-      "path" : "/flowconfigsV2/{id}"
+      "path" : "/flowconfigsV2/{id}",
+      "actions" : [ {
+        "name" : "runImmediately",
+        "doc" : "Trigger a new execution of an existing flow",
+        "returns" : "string"
+      } ]
     }
   }
 }
\ No newline at end of file
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 2442bd9..a45eab9 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -180,7 +180,12 @@
         } ]
       } ],
       "entity" : {
-        "path" : "/flowconfigsV2/{id}"
+        "path" : "/flowconfigsV2/{id}",
+        "actions" : [ {
+          "name" : "runImmediately",
+          "doc" : "Trigger a new execution of an existing flow",
+          "returns" : "string"
+        } ]
       }
     }
   }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index cb35e8f..68b04bd 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -36,6 +36,7 @@ import com.linkedin.r2.RemoteInvocationException;
 import com.linkedin.r2.transport.common.Client;
 import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.ActionRequest;
 import com.linkedin.restli.client.CreateIdEntityRequest;
 import com.linkedin.restli.client.DeleteRequest;
 import com.linkedin.restli.client.FindRequest;
@@ -255,6 +256,19 @@ public class FlowConfigV2Client implements Closeable {
     response.getResponse();
   }
 
+  public String runImmediately(FlowId flowId)
+      throws RemoteInvocationException {
+    LOG.debug("runImmediately with groupName " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName());
+
+    ActionRequest<String> runImmediatelyRequest = _flowconfigsV2RequestBuilders.actionRunImmediately()
+        .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
+
+    Response<String> response = (Response<String>) FlowClientUtils.sendRequestWithRetry(_restClient.get(), runImmediatelyRequest,
+        FlowconfigsV2RequestBuilders.getPrimaryResource());
+
+    return response.getEntity();
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index bf96f0a..fa4ad11 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -46,7 +46,6 @@ import com.linkedin.restli.client.RestLiResponseException;
 import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.internal.server.util.DataMapUtils;
 import com.linkedin.restli.server.resources.BaseResource;
-import com.linkedin.restli.server.util.PatchApplier;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -85,6 +84,9 @@ public class FlowConfigV2Test {
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
 
+  private static final ServiceRequester TEST_REQUESTER = new ServiceRequester("testName", "USER_PRINCIPAL", "testFrom");
+  private static final ServiceRequester TEST_REQUESTER2 = new ServiceRequester("testName2", "USER_PRINCIPAL", "testFrom");
+
   @BeforeClass
   public void setUp() throws Exception {
     ConfigBuilder configBuilder = ConfigBuilder.create();
@@ -168,6 +170,7 @@ public class FlowConfigV2Test {
   @Test
   public void testPartialUpdate() throws Exception {
     FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_3);
+    _requesterService.setRequester(TEST_REQUESTER);
 
     Map<String, String> flowProperties = Maps.newHashMap();
     flowProperties.put("param1", "value1");
@@ -187,9 +190,7 @@ public class FlowConfigV2Test {
     DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
     PatchRequest<FlowConfig> flowConfigPatch = PatchRequest.createFromPatchDocument(dataMap);
 
-    PatchApplier.applyPatch(flowConfig, flowConfigPatch);
-
-    _client.updateFlowConfig(flowConfig);
+    _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
 
     FlowConfig retrievedFlowConfig = _client.getFlowConfig(flowId);
 
@@ -273,8 +274,7 @@ public class FlowConfigV2Test {
 
   @Test
   public void testGroupUpdateRejected() throws Exception {
-   ServiceRequester testRequester = new ServiceRequester("testName", "USER_PRINCIPAL", "testFrom");
-   _requesterService.setRequester(testRequester);
+   _requesterService.setRequester(TEST_REQUESTER);
    Map<String, String> flowProperties = Maps.newHashMap();
 
    FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7))
@@ -296,9 +296,7 @@ public class FlowConfigV2Test {
 
   @Test
   public void testRequesterUpdate() throws Exception {
-    ServiceRequester testRequester = new ServiceRequester("testName", "USER_PRINCIPAL", "testFrom");
-    ServiceRequester testRequester2 = new ServiceRequester("testName2", "USER_PRINCIPAL", "testFrom");
-    _requesterService.setRequester(testRequester);
+    _requesterService.setRequester(TEST_REQUESTER);
     Map<String, String> flowProperties = Maps.newHashMap();
 
     FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_8);
@@ -310,22 +308,20 @@ public class FlowConfigV2Test {
     _client.createFlowConfig(flowConfig);
 
     // testName2 takes ownership of the flow
-    flowProperties.put(RequesterService.REQUESTER_LIST, RequesterService.serialize(Lists.newArrayList(testRequester2)));
+    flowProperties.put(RequesterService.REQUESTER_LIST, RequesterService.serialize(Lists.newArrayList(TEST_REQUESTER2)));
     flowConfig.setProperties(new StringMap(flowProperties));
-    _requesterService.setRequester(testRequester2);
+    _requesterService.setRequester(TEST_REQUESTER2);
     _client.updateFlowConfig(flowConfig);
 
     // Check that the requester list was actually updated
     FlowConfig updatedFlowConfig = _client.getFlowConfig(flowId);
     Assert.assertEquals(RequesterService.deserialize(updatedFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST)),
-        Lists.newArrayList(testRequester2));
+        Lists.newArrayList(TEST_REQUESTER2));
   }
 
   @Test
   public void testRequesterUpdateRejected() throws Exception {
-    ServiceRequester testRequester = new ServiceRequester("testName", "USER_PRINCIPAL", "testFrom");
-    ServiceRequester testRequester2 = new ServiceRequester("testName2", "USER_PRINCIPAL", "testFrom");
-    _requesterService.setRequester(testRequester);
+    _requesterService.setRequester(TEST_REQUESTER);
     Map<String, String> flowProperties = Maps.newHashMap();
 
     FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_9))
@@ -335,7 +331,7 @@ public class FlowConfigV2Test {
     _client.createFlowConfig(flowConfig);
 
     // Update should be rejected because testName is not allowed to update the owner to testName2
-    flowProperties.put(RequesterService.REQUESTER_LIST, RequesterService.serialize(Lists.newArrayList(testRequester2)));
+    flowProperties.put(RequesterService.REQUESTER_LIST, RequesterService.serialize(Lists.newArrayList(TEST_REQUESTER2)));
     flowConfig.setProperties(new StringMap(flowProperties));
     try {
       _client.updateFlowConfig(flowConfig);
@@ -375,6 +371,29 @@ public class FlowConfigV2Test {
     Assert.fail();
   }
 
+  @Test
+  public void testRunFlow() throws Exception {
+    String flowName = "testRunFlow";
+    FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(flowName);
+    _requesterService.setRequester(TEST_REQUESTER);
+
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(flowName))
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+
+    // Create initial flowConfig
+    _client.createFlowConfig(flowConfig);
+
+    // Trigger flow
+    _client.runImmediately(flowId);
+
+    // Verify runImmediately was changed to true
+    Assert.assertTrue(_client.getFlowConfig(flowId).getSchedule().isRunImmediately());
+  }
+
   @AfterClass(alwaysRun = true)
   public void tearDown() throws Exception {
     if (_client != null) {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 289ab89..70d82d5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -21,12 +21,14 @@ import java.util.Map;
 import org.apache.commons.lang3.StringEscapeUtils;
 
 import com.linkedin.data.template.StringMap;
+import com.linkedin.data.transform.DataProcessingException;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
 
 import javax.inject.Inject;
 import lombok.extern.slf4j.Slf4j;
@@ -97,8 +99,20 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
     return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, httpStatus);
   }
 
+  /**
+   * Note: this method is only implemented for testing, normally partial update would be called in
+   * GobblinServiceFlowConfigResourceHandler.partialUpdateFlowConfig
+   */
   @Override
   public UpdateResponse partialUpdateFlowConfig(FlowId flowId, PatchRequest<FlowConfig> flowConfigPatch) throws FlowConfigLoggedException {
-    throw new UnsupportedOperationException("Partial update only supported by GobblinServiceFlowConfigResourceHandler");
+    FlowConfig flowConfig = getFlowConfig(flowId);
+
+    try {
+      PatchApplier.applyPatch(flowConfig, flowConfigPatch);
+    } catch (DataProcessingException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "Failed to apply partial update", e);
+    }
+
+    return updateFlowConfig(flowId, flowConfig);
   }
 }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 00b49f7..1aebb75 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -17,27 +17,35 @@
 package org.apache.gobblin.service;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableSet;
+import com.linkedin.data.DataMap;
 import com.linkedin.data.transform.DataProcessingException;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.internal.server.util.DataMapUtils;
 import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.PathKeys;
+import com.linkedin.restli.server.ResourceLevel;
 import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.Action;
 import com.linkedin.restli.server.annotations.Context;
 import com.linkedin.restli.server.annotations.Finder;
 import com.linkedin.restli.server.annotations.Optional;
+import com.linkedin.restli.server.annotations.PathKeysParam;
 import com.linkedin.restli.server.annotations.QueryParam;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.annotations.ReturnEntity;
@@ -197,6 +205,20 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
     return this.getFlowConfigResourceHandler().deleteFlowConfig(flowId, getHeaders());
   }
 
+  /**
+   * Trigger a new execution of an existing flow
+   * @param pathKeys key of {@link FlowId} specified in path
+   */
+  @Action(name="runImmediately", resourceLevel=ResourceLevel.ENTITY)
+  public String runImmediately(@PathKeysParam PathKeys pathKeys) {
+    String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}}}";
+    DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson, Charset.defaultCharset()));
+    PatchRequest<FlowConfig> flowConfigPatch = PatchRequest.createFromPatchDocument(dataMap);
+    ComplexResourceKey<FlowId, FlowStatusId> id = pathKeys.get("id");
+    update(id, flowConfigPatch);
+    return "Successfully triggered flow " + id.getKey().toString();
+  }
+
   private FlowConfigsResourceHandler getFlowConfigResourceHandler() {
     if (global_flowConfigsResourceHandler != null) {
       return global_flowConfigsResourceHandler;