You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/16 05:29:38 UTC
[zeppelin] branch master updated: [ZEPPELIN-4873]. Display rich
duration info for insert into flink job
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 3139ed6 [ZEPPELIN-4873]. Display rich duration info for insert into flink job
3139ed6 is described below
commit 3139ed6adbbb97246dd4c36ee1505755ed2df0fc
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jun 15 15:39:16 2020 +0800
[ZEPPELIN-4873]. Display rich duration info for insert into flink job
### What is this PR for?
Trivial PR which display rich duration info instead of just x seconds. See screenshot below.
### What type of PR is it?
[ Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4873
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/84286308-19291b80-ab71-11ea-96ef-b237d2463b8c.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3794 from zjffdu/ZEPPELIN-4873 and squashes the following commits:
459f181c5 [Jeff Zhang] add java doc
419818e4a [Jeff Zhang] address comment
9691eae82 [Jeff Zhang] [ZEPPELIN-4873]. Display rich duration info for insert into flink job
---
.../java/org/apache/zeppelin/flink/JobManager.java | 63 +++++++++++++++++-----
.../org/apache/zeppelin/flink/JobManagerTest.java | 43 +++++++++++++++
2 files changed, 92 insertions(+), 14 deletions(-)
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index ccbfe39..914d8a6 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -29,9 +29,11 @@ import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class JobManager {
@@ -110,7 +112,7 @@ public class JobManager {
}
public void cancelJob(InterpreterContext context) throws InterpreterException {
- LOGGER.info("Canceling job associated of paragraph: "+ context.getParagraphId());
+ LOGGER.info("Canceling job associated of paragraph: {}", context.getParagraphId());
JobClient jobClient = this.jobs.get(context.getParagraphId());
if (jobClient == null) {
LOGGER.warn("Unable to remove Job from paragraph {} as no job associated to this paragraph",
@@ -178,8 +180,12 @@ public class JobManager {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted() && running.get()) {
+
JsonNode rootNode = null;
try {
+ synchronized (running) {
+ running.wait(1000);
+ }
rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString())
.asJson().getBody();
JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
@@ -200,13 +206,12 @@ public class JobManager {
if (jobState.equalsIgnoreCase("finished")) {
break;
}
- synchronized (running) {
- running.wait(1000);
- }
+ long duration = rootNode.getObject().getLong("duration") / 1000;
+
if (isStreamingInsertInto) {
if (isFirstPoll) {
StringBuilder builder = new StringBuilder("%angular ");
- builder.append("<h1>Duration: {{duration}} seconds");
+ builder.append("<h1>Duration: {{duration}} </h1>");
builder.append("\n%text ");
context.out.clear(false);
context.out.write(builder.toString());
@@ -214,7 +219,7 @@ public class JobManager {
isFirstPoll = false;
}
context.getAngularObjectRegistry().add("duration",
- rootNode.getObject().getLong("duration") / 1000,
+ toRichTimeDuration(duration),
context.getNoteId(),
context.getParagraphId());
}
@@ -224,15 +229,45 @@ public class JobManager {
}
}
- public void cancel () {
- this.running.set(false);
- synchronized (running) {
- running.notify();
- }
+ public void cancel() {
+ this.running.set(false);
+ synchronized (running) {
+ running.notify();
}
+ }
- public int getProgress () {
- return progress;
- }
+ public int getProgress() {
+ return progress;
}
}
+
+ /**
+ * Convert duration in seconds to rich time duration format. e.g. 2 days 3 hours 4 minutes 5 seconds
+ *
+ * @param duration in second
+ * @return
+ */
+ static String toRichTimeDuration(long duration) {
+ long days = TimeUnit.SECONDS.toDays(duration);
+ duration -= TimeUnit.DAYS.toSeconds(days);
+ long hours = TimeUnit.SECONDS.toHours(duration);
+ duration -= TimeUnit.HOURS.toSeconds(hours);
+ long minutes = TimeUnit.SECONDS.toMinutes(duration);
+ duration -= TimeUnit.MINUTES.toSeconds(minutes);
+ long seconds = TimeUnit.SECONDS.toSeconds(duration);
+
+ StringBuilder builder = new StringBuilder();
+ if (days != 0) {
+ builder.append(days + " days ");
+ }
+ if (days != 0 || hours != 0) {
+ builder.append(hours + " hours ");
+ }
+ if (days != 0 || hours != 0 || minutes != 0) {
+ builder.append(minutes + " minutes ");
+ }
+ builder.append(seconds + " seconds");
+ return builder.toString();
+ }
+
+}
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
new file mode 100644
index 0000000..fe196e9
--- /dev/null
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerTest {
+
+ @Test
+ public void testRichDuration() {
+ String richDuration = JobManager.toRichTimeDuration(18);
+ assertEquals("18 seconds", richDuration);
+
+ richDuration = JobManager.toRichTimeDuration(120);
+ assertEquals("2 minutes 0 seconds", richDuration);
+
+ richDuration = JobManager.toRichTimeDuration(60 * 60 + 1);
+ assertEquals("1 hours 0 minutes 1 seconds", richDuration);
+
+ richDuration = JobManager.toRichTimeDuration(60 * 60 + 60 + 1);
+ assertEquals("1 hours 1 minutes 1 seconds", richDuration);
+
+ richDuration = JobManager.toRichTimeDuration(24 * 60 * 60 + 60 + 1);
+ assertEquals("1 days 0 hours 1 minutes 1 seconds", richDuration);
+ }
+}