You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2019/02/20 19:08:29 UTC

[GitHub] yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258633224
 
 

 ##########
 File path: gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##########
 @@ -402,97 +329,71 @@ public AzkabanExecuteFlowStatus executeFlowWithOptions(
    *
    * @return The status object which contains success status and execution id.
    */
-  public AzkabanExecuteFlowStatus executeFlow(
-      String projectName,
-      String flowName,
-      Map<String, String> flowParameters) {
+  public AzkabanExecuteFlowStatus executeFlow(String projectName,
+                                              String flowName,
+                                              Map<String, String> flowParameters) throws AzkabanClientException {
     return executeFlowWithOptions(projectName, flowName, null, flowParameters);
   }
 
   /**
    * Cancel a flow by execution id.
    */
-  public AzkabanClientStatus cancelFlow(int execId) {
-    try {
-      refreshSession();
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "cancelFlow"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, String.valueOf(execId)));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
-
-      HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
-      httpGet.setHeaders(new Header[]{contentType, requestType});
-
-      CloseableHttpResponse response = this.httpClient.execute(httpGet);
-      try {
-        handleResponse(response);
-        return new AzkabanClientStatus.SUCCESS();
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanClientStatus.FAIL("", e);
-    }
+  public AzkabanClientStatus cancelFlow(String execId) throws AzkabanClientException {
+    AzkabanMultiCallables.CancelFlowCallable callable =
+        new AzkabanMultiCallables.CancelFlowCallable(this,
+            execId);
+
+    return runWithRetry(callable, AzkabanClientStatus.class);
   }
 
+  /**
+   * Fetch an execution log.
+   */
+  public AzkabanClientStatus fetchExecutionLog(String execId,
+                                               String jobId,
+                                               String offset,
+                                               String length,
+                                               File ouf) throws AzkabanClientException {
+    AzkabanMultiCallables.FetchExecLogCallable callable =
+        new AzkabanMultiCallables.FetchExecLogCallable(this,
+            execId,
+            jobId,
+            offset,
+            length,
+            ouf);
+
+    return runWithRetry(callable, AzkabanClientStatus.class);
+  }
 
   /**
-   * Given an execution id, fetches all the detailed information of that execution, including a list of all the job executions.
+   * Given an execution id, fetches all the detailed information of that execution,
+   * including a list of all the job executions.
    *
    * @param execId execution id to be fetched.
    *
-   * @return The status object which contains success status and all the detailed information of that execution.
+   * @return The status object which contains success status and all the detailed
+   *         information of that execution.
    */
-  public AzkabanFetchExecuteFlowStatus fetchFlowExecution (String execId) {
-    try {
-      refreshSession();
-      List<NameValuePair> nvps = new ArrayList<>();
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "fetchexecflow"));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, this.sessionId));
-      nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
-
-      Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
-      Header requestType = new BasicHeader("X-Requested-With", "XMLHttpRequest");
-
-      HttpGet httpGet = new HttpGet(url + "/executor?" + URLEncodedUtils.format(nvps, "UTF-8"));
-      httpGet.setHeaders(new Header[]{contentType, requestType});
-
-      CloseableHttpResponse response = this.httpClient.execute(httpGet);
-      try {
-        Map<String, String> map = handleResponse(response);
-        return new AzkabanFetchExecuteFlowStatus(new AzkabanFetchExecuteFlowStatus.Execution(map));
-      } finally {
-        response.close();
-      }
-    } catch (Exception e) {
-      return new AzkabanFetchExecuteFlowStatus("Azkaban client cannot "
-          + "fetch execId " + execId, e);
-    }
-  }
+  public AzkabanFetchExecuteFlowStatus fetchFlowExecution(String execId) throws AzkabanClientException {
+    AzkabanMultiCallables.FetchFlowExecCallable callable =
+        new AzkabanMultiCallables.FetchFlowExecCallable(this, execId);
 
-  private void addFlowParameters(List<NameValuePair> nvps, Map<String, String> flowParams) {
-    if (flowParams != null) {
-      for (Map.Entry<String, String> entry : flowParams.entrySet()) {
-        String key = entry.getKey();
-        String value = entry.getValue();
-        if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
-          log.debug("New flow parameter added:" + key + "-->" + value);
-          nvps.add(new BasicNameValuePair("flowOverride[" + key + "]", value));
-        }
-      }
-    }
+    return runWithRetry(callable, AzkabanFetchExecuteFlowStatus.class);
   }
 
-  private void addFlowOptions(List<NameValuePair> nvps, Map<String, String> flowOptions) {
-    if (flowOptions != null) {
-      for (Map.Entry<String, String> option : flowOptions.entrySet()) {
-        log.debug("New flow option added:" + option .getKey()+ "-->" + option.getValue());
-        nvps.add(new BasicNameValuePair(option.getKey(), option.getValue()));
+  private <T> T runWithRetry(Callable callable, Class<T> cls) throws AzkabanClientException {
+    try {
+      AzkabanClientStatus status = this.retryer.call(callable);
 
 Review comment:
   I force a retry now. Please review the latest

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services