You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/16 05:29:38 UTC

[zeppelin] branch master updated: [ZEPPELIN-4873]. Display rich duration info for insert into flink job

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3139ed6  [ZEPPELIN-4873]. Display rich duration info for insert into flink job
3139ed6 is described below

commit 3139ed6adbbb97246dd4c36ee1505755ed2df0fc
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jun 15 15:39:16 2020 +0800

    [ZEPPELIN-4873]. Display rich duration info for insert into flink job
    
    ### What is this PR for?
    
    Trivial PR which display rich duration info instead of just x seconds. See screenshot below.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4873
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/84286308-19291b80-ab71-11ea-96ef-b237d2463b8c.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3794 from zjffdu/ZEPPELIN-4873 and squashes the following commits:
    
    459f181c5 [Jeff Zhang] add java doc
    419818e4a [Jeff Zhang] address comment
    9691eae82 [Jeff Zhang] [ZEPPELIN-4873]. Display rich duration info for insert into flink job
---
 .../java/org/apache/zeppelin/flink/JobManager.java | 63 +++++++++++++++++-----
 .../org/apache/zeppelin/flink/JobManagerTest.java  | 43 +++++++++++++++
 2 files changed, 92 insertions(+), 14 deletions(-)

diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index ccbfe39..914d8a6 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -29,9 +29,11 @@ import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class JobManager {
@@ -110,7 +112,7 @@ public class JobManager {
   }
 
   public void cancelJob(InterpreterContext context) throws InterpreterException {
-    LOGGER.info("Canceling job associated of paragraph: "+ context.getParagraphId());
+    LOGGER.info("Canceling job associated of paragraph: {}", context.getParagraphId());
     JobClient jobClient = this.jobs.get(context.getParagraphId());
     if (jobClient == null) {
       LOGGER.warn("Unable to remove Job from paragraph {} as no job associated to this paragraph",
@@ -178,8 +180,12 @@ public class JobManager {
     @Override
     public void run() {
       while (!Thread.currentThread().isInterrupted() && running.get()) {
+
         JsonNode rootNode = null;
         try {
+          synchronized (running) {
+            running.wait(1000);
+          }
           rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString())
                   .asJson().getBody();
           JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
@@ -200,13 +206,12 @@ public class JobManager {
           if (jobState.equalsIgnoreCase("finished")) {
             break;
           }
-          synchronized (running) {
-            running.wait(1000);
-          }
+          long duration = rootNode.getObject().getLong("duration") / 1000;
+
           if (isStreamingInsertInto) {
             if (isFirstPoll) {
               StringBuilder builder = new StringBuilder("%angular ");
-              builder.append("<h1>Duration: {{duration}} seconds");
+              builder.append("<h1>Duration: {{duration}} </h1>");
               builder.append("\n%text ");
               context.out.clear(false);
               context.out.write(builder.toString());
@@ -214,7 +219,7 @@ public class JobManager {
               isFirstPoll = false;
             }
             context.getAngularObjectRegistry().add("duration",
-                    rootNode.getObject().getLong("duration") / 1000,
+                    toRichTimeDuration(duration),
                     context.getNoteId(),
                     context.getParagraphId());
           }
@@ -224,15 +229,45 @@ public class JobManager {
       }
     }
 
-      public void cancel () {
-        this.running.set(false);
-        synchronized (running) {
-          running.notify();
-        }
+    public void cancel() {
+      this.running.set(false);
+      synchronized (running) {
+        running.notify();
       }
+    }
 
-      public int getProgress () {
-        return progress;
-      }
+    public int getProgress() {
+      return progress;
     }
   }
+
+  /**
+   * Convert duration in seconds to rich time duration format. e.g. 2 days 3 hours 4 minutes 5 seconds
+   *
+   * @param duration in second
+   * @return
+   */
+  static String toRichTimeDuration(long duration) {
+    long days = TimeUnit.SECONDS.toDays(duration);
+    duration -= TimeUnit.DAYS.toSeconds(days);
+    long hours = TimeUnit.SECONDS.toHours(duration);
+    duration -= TimeUnit.HOURS.toSeconds(hours);
+    long minutes = TimeUnit.SECONDS.toMinutes(duration);
+    duration -= TimeUnit.MINUTES.toSeconds(minutes);
+    long seconds = TimeUnit.SECONDS.toSeconds(duration);
+
+    StringBuilder builder = new StringBuilder();
+    if (days != 0) {
+      builder.append(days + " days ");
+    }
+    if (days != 0 || hours != 0) {
+      builder.append(hours + " hours ");
+    }
+    if (days != 0 || hours != 0 || minutes != 0) {
+      builder.append(minutes + " minutes ");
+    }
+    builder.append(seconds + " seconds");
+    return builder.toString();
+  }
+
+}
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
new file mode 100644
index 0000000..fe196e9
--- /dev/null
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerTest {
+
+  @Test
+  public void testRichDuration() {
+    String richDuration = JobManager.toRichTimeDuration(18);
+    assertEquals("18 seconds", richDuration);
+
+    richDuration = JobManager.toRichTimeDuration(120);
+    assertEquals("2 minutes 0 seconds", richDuration);
+
+    richDuration = JobManager.toRichTimeDuration(60 * 60 + 1);
+    assertEquals("1 hours 0 minutes 1 seconds", richDuration);
+
+    richDuration = JobManager.toRichTimeDuration(60 * 60 + 60 + 1);
+    assertEquals("1 hours 1 minutes 1 seconds", richDuration);
+
+    richDuration = JobManager.toRichTimeDuration(24 * 60 * 60 + 60 + 1);
+    assertEquals("1 days 0 hours 1 minutes 1 seconds", richDuration);
+  }
+}