You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/03/08 03:50:31 UTC

[linkis] branch dev-1.3.2 updated: [Feature] Spark once example (#4335)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new c42edf500 [Feature] Spark once example (#4335)
c42edf500 is described below

commit c42edf5001428abcda6638459f93caeb106bae59
Author: rarexixi <ra...@gmail.com>
AuthorDate: Wed Mar 8 11:50:25 2023 +0800

    [Feature] Spark once example (#4335)
---
 .../computation/client/SparkOnceJobTest.java       | 184 +++++++++++++++++----
 1 file changed, 150 insertions(+), 34 deletions(-)

diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
index 963c8569f..cc7ab796d 100644
--- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
@@ -19,8 +19,11 @@ package org.apache.linkis.computation.client;
 
 import org.apache.linkis.computation.client.once.simple.SubmittableSimpleOnceJob;
 import org.apache.linkis.computation.client.operator.impl.*;
+import org.apache.linkis.ujes.client.exception.UJESJobException;
 
+import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.function.Supplier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +53,7 @@ public class SparkOnceJobTest {
             .addJobContent("spark.app.args", "hdfs:///tmp/log.log -a 10 -b=12")
             .addJobContent(
                 "spark.extconf", "spark.a=d\nspark.c=d\nspark.args.start_date=2022-06-14")
-            .addLabel("engineType", engineType + "-2.4.7")
+            .addLabel("engineType", engineType + "-2.4.3")
             .addLabel("userCreator", submitUser + "-IDE")
             .addLabel("engineConnMode", "once")
             .addStartupParam("spark.app.name", "spark-submit-jar-test-xi")
@@ -64,64 +67,177 @@ public class SparkOnceJobTest {
             .build();
     // endregion
     onceJob.submit();
-    logger.info("jobId: {}", onceJob.getId());
+
+    // test
+    // Kill after 10s
+    // new Thread(() -> {
+    //     try {
+    //         Thread.sleep(10000);
+    //     } catch (InterruptedException e) {
+    //         throw new RuntimeException(e);
+    //     }
+    //     onceJob.kill();
+    // }).start();
+
+    // method 1:wait for completing
+    waitComplete(onceJob);
+
+    // method 2:wait for completing and print log
+    // waitForCompletedAndPrintLog(onceJob);
+  }
+
+  static void printProgress(SubmittableSimpleOnceJob onceJob) {
+    // get progress
+    EngineConnProgressOperator progressOperator =
+        (EngineConnProgressOperator)
+            onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME());
+    new Thread(
+            () -> {
+              while (true) {
+                try {
+                  EngineConnProgressInfo engineConnProgressInfo2 =
+                      (EngineConnProgressInfo) progressOperator.apply();
+                  System.out.println("progress: " + engineConnProgressInfo2.progress());
+                  if (engineConnProgressInfo2.progress() >= 1) break;
+                } catch (Exception e) {
+                  System.out.println("progress: error");
+                }
+                try {
+                  Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+              }
+            })
+        .start();
+  }
+
+  static void waitComplete(SubmittableSimpleOnceJob onceJob) throws InterruptedException {
+
+    // Wait for the end of the operation, and there may be exceptions, which may cause client
+    // interruption
+    boolean finished = false;
+    int retryTimes = 10;
+    boolean killed = false;
+    while (!finished) {
+      try {
+        onceJob.waitForCompleted();
+        finished = true;
+      } catch (Exception exception) {
+        if (exception instanceof UJESJobException) {
+          UJESJobException ujesJobException = (UJESJobException) exception;
+          if (ujesJobException.getDesc().endsWith("is killed!")) {
+            killed = true;
+            break;
+          }
+        }
+        if (--retryTimes == 0) {
+          break;
+        }
+      }
+    }
+
+    if (killed) {
+      System.out.println("Job has been killed");
+      return;
+    }
+
+    System.out.println(LocalDateTime.now() + "get complete: job status " + onceJob.getStatus());
+    boolean complete = onceJob.isCompleted(onceJob.getStatus());
+    // Judge and wait for the final completion.
+    // todo Here you may need to set the number of times to handle other exceptions
+    int i = 0;
+    // If the acquisition fails for 10 times, the connection may be lost
+    while (!complete && i++ < 10) {
+      try {
+        complete = onceJob.isCompleted();
+      } catch (Exception e) {
+        logger.error("Get isCompleted error, retry after 2s");
+        Thread.sleep(2000);
+      }
+    }
+  }
+
+  static void waitForCompletedAndPrintLog(SubmittableSimpleOnceJob onceJob)
+      throws InterruptedException {
 
     EngineConnLogOperator logOperator =
         (EngineConnLogOperator) onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME());
     int fromLine = 1;
     int logSize = 1000; // log lines
     logOperator.setPageSize(logSize);
-    logOperator.setEngineConnType("spark");
+    logOperator.setEngineConnType("spark_jar");
 
     ArrayList<String> logLines;
-
+    EngineConnLogs logs;
+    int emptyLogsCount = 0;
     // print log
     while (true) {
-      try {
-        logOperator.setFromLine(fromLine);
-        EngineConnLogs logs = (EngineConnLogs) logOperator.apply();
-        logLines = logs.logs();
-        if (logLines == null || logLines.isEmpty()) {
-          if (!isCompleted(onceJob, 3)) {
-            Thread.sleep(2000);
-            continue;
-          } else {
-            break;
-          }
+      // set start line
+      logOperator.setFromLine(fromLine);
+      Supplier<EngineConnLogs> supplier =
+          () -> {
+            onceJob.isCompleted();
+            return (EngineConnLogs) logOperator.apply();
+          };
+      logs = retry(supplier, 3, 2000, null, "Get Logs error");
+      if (logs == null) {
+        break;
+      }
+
+      if ((logLines = logs.logs()) == null || logLines.isEmpty()) {
+        emptyLogsCount++;
+        // The log is empty for 10 times, indicating that the back-end engine may be killed
+        if (emptyLogsCount == 10) {
+          break;
         }
-        for (String log : logLines) {
-          System.out.println(log);
+        // The log is empty and incomplete. Sleep for 2 seconds.
+        // isCompleted method may fail due to network errors and other reasons. Retry 3 times first
+        boolean completed = retry(onceJob::isCompleted, 3, 2000, true, "isCompleted error");
+        if (!completed) {
+          Thread.sleep(2000);
+          continue;
+        } else {
+          break;
         }
-        fromLine += logLines.size();
-      } catch (Exception e) {
-        logger.error("Failed to get log information", e);
-        break;
       }
+      emptyLogsCount = 0;
+      for (String log : logLines) {
+        // print log
+        System.out.println(log);
+      }
+      // set new start line
+      fromLine += logLines.size();
     }
-
-    boolean complete = false;
-    // wait complete
-    while (!complete) {
+    System.out.println(LocalDateTime.now() + "get complete: job status " + onceJob.getStatus());
+    boolean complete = onceJob.isCompleted(onceJob.getStatus());
+    // Judge and wait for the final completion.
+    // todo Here you may need to set the number of times to handle other exceptions
+    int i = 0;
+    while (!complete && i++ < 10) {
       try {
         complete = onceJob.isCompleted();
       } catch (Exception e) {
-        logger.error("isCompleted error", e);
+        logger.error("Get isCompleted error, retry after 2s");
+        Thread.sleep(2000);
       }
     }
-
     String finalStatus = onceJob.getStatus();
-    logger.info("final status " + finalStatus);
+    // If the final status is running, the program may terminate abnormally
+    // Can be set as Canceled/Lost/Killed
+    if ("Running".equals(finalStatus)) finalStatus = "Cancelled";
+    System.out.println(LocalDateTime.now() + "finalStatus, " + finalStatus);
   }
 
-  static boolean isCompleted(SubmittableSimpleOnceJob onceJob, int times)
+  static <R> R retry(
+      Supplier<R> supplier, int times, int retryDuration, R finalDefaultResult, String errMsg)
       throws InterruptedException {
-    if (times == 0) return false;
+    if (times <= 0) return finalDefaultResult;
     try {
-      return onceJob.isCompleted();
+      return supplier.get();
     } catch (Exception e) {
-      logger.error("isCompleted error", e);
-      Thread.sleep(2000);
-      return isCompleted(onceJob, times - 1);
+      logger.error(errMsg);
+      Thread.sleep(retryDuration);
+      return retry(supplier, times - 1, retryDuration, finalDefaultResult, errMsg);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org