You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2016/06/15 11:09:24 UTC

zeppelin git commit: ZEPPELIN-963 ] Jobmanagement - (1) basic backend.

Repository: zeppelin
Updated Branches:
  refs/heads/master 020e0e7f8 -> cfe677b82


ZEPPELIN-963 ] Jobmanagement - (1) basic backend.

### What is this PR for?
Job management basic backend.
was divided into smaller " PR ". - https://github.com/apache/incubator-zeppelin/pull/921
Receive the basic data of Job manager api and the backend have been implemented.

### What type of PR is it?
Feature

### Todos
- [x] - Basic backend for job manager.
- [x] - Basic backend api

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-963

### How should this be tested?
#### step 1.
First, calling the rest api as follows receives the data of "job" of the whole.
```shell
curl -H "Content-Type: application/json" -X GET http://127.0.0.1:8080/api/notebook/jobmanager/
```
result
```json
{
   "status":"OK", // result for api request
   "body":{
      "lastResponseUnixTime":1465289938763, // last get server unixtime stamp.
      "jobs":[  // job list
         {
            "notebookId":"2BMQZ9QP6",  // notebook id.
            "unixTimeLastRun":1465289017310, // notebook last running unixtime.
            "notebookType":"normal", // cron or normal
            "isRunningJob":false, // is Running?
            "notebookName":"Untitled Note 1226", // notebook name.
            "interpreter":"spark", // default interpreter group name or If you have not selected it does not exist.
            "paragraphs":[
               {
                  "name":"20160607-174331_232775609", // paragraph name 'undefined is notebook id'
                  "id":"20160607-174331_232775609", // paragraph id
                  "status":"FINISHED" // paragraph job status
               }
            ]
         }
      ]
   }
}
```

#### step 2.
For example, it showed the result of receiving the information as one of the notebook.
focus on "lastResponseUnixTime" value.
This value is inserted as an argument when you call the following restapi, us to be able to get the updated data.

**Create a Notebook, or run the Paragraph.**
And, **call updated notebook api**

**"lastResponseUnixTime": 1465289938763** (step 1 get value)
```shell
curl -H "Content-Type: application/json" -X GET http://127.0.0.1:8080/api/notebook/jobmanager/1465289938763
```

#### step 3.
If you created a Notebook, a new Notebook information is displayed.
If there is a Notebook that if there is a Paragraph of the running, the information will also be displayed.

#### step 4.
When you restart the step1 of api, if you created a Notebook, the number of Notebook is +1 than when the first call.

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? yes

Author: CloverHearts <cl...@gmail.com>

Closes #972 from cloverhearts/feat/sm/ZEPPELIN-531-basic-backend and squashes the following commits:

330ed73 [CloverHearts] change indent for jobmanager backend
c7bf75a [CloverHearts] Jobmanager basic backend.


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/cfe677b8
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/cfe677b8
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/cfe677b8

Branch: refs/heads/master
Commit: cfe677b827249171c3fcebca25cc215a50ec06fa
Parents: 020e0e7
Author: CloverHearts <cl...@gmail.com>
Authored: Tue Jun 7 18:01:23 2016 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Wed Jun 15 20:09:15 2016 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/rest/NotebookRestApi.java   |  47 ++++++-
 .../apache/zeppelin/socket/NotebookServer.java  |  48 +++++++
 .../org/apache/zeppelin/notebook/Notebook.java  | 132 +++++++++++++++++++
 .../zeppelin/notebook/socket/Message.java       |  10 +-
 4 files changed, 232 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/cfe677b8/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index 6c50ee4..2bbe796 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -686,7 +686,52 @@ public class NotebookRestApi {
     }
     
     return new JsonResponse<>(Status.OK, note.getConfig().get("cron")).build();
-  }  
+  }
+
+  /**
+   * Get notebook jobs for job manager
+   * @param
+   * @return JSON with status.OK
+   * @throws IOException, IllegalArgumentException
+   */
+  @GET
+  @Path("jobmanager/")
+  @ZeppelinApi
+  public Response getJobListforNotebook() throws IOException, IllegalArgumentException {
+    LOG.info("Get notebook jobs for job manager");
+
+    List<Map<String, Object>> notebookJobs = notebook.getJobListforNotebook(false, 0);
+    Map<String, Object> response = new HashMap<>();
+
+    response.put("lastResponseUnixTime", System.currentTimeMillis());
+    response.put("jobs", notebookJobs);
+
+    return new JsonResponse<>(Status.OK, response).build();
+  }
+
+  /**
+   * Get updated notebook jobs for job manager
+   * @param
+   * @return JSON with status.OK
+   * @throws IOException, IllegalArgumentException
+   */
+  @GET
+  @Path("jobmanager/{lastUpdateUnixtime}/")
+  @ZeppelinApi
+  public Response getUpdatedJobListforNotebook(
+      @PathParam("lastUpdateUnixtime") long lastUpdateUnixTime) throws
+      IOException, IllegalArgumentException {
+    LOG.info("Get updated notebook jobs lastUpdateTime {}", lastUpdateUnixTime);
+
+    List<Map<String, Object>> notebookJobs;
+    notebookJobs = notebook.getJobListforNotebook(false, lastUpdateUnixTime);
+    Map<String, Object> response = new HashMap<>();
+
+    response.put("lastResponseUnixTime", System.currentTimeMillis());
+    response.put("jobs", notebookJobs);
+
+    return new JsonResponse<>(Status.OK, response).build();
+  }
 
   /**
    * Search for a Notes with permissions

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/cfe677b8/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 7d50809..a94fefa 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -61,6 +61,20 @@ import org.slf4j.LoggerFactory;
 public class NotebookServer extends WebSocketServlet implements
         NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener,
         RemoteInterpreterProcessListener {
+  /**
+   * Job manager service type
+   */
+  protected enum JOB_MANAGER_SERVICE {
+    JOB_MANAGER_PAGE("JOB_MANAGER_PAGE");
+    private String serviceTypeKey;
+    JOB_MANAGER_SERVICE(String serviceType) {
+      this.serviceTypeKey = serviceType;
+    }
+    String getKey() {
+      return this.serviceTypeKey;
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
   Gson gson = new GsonBuilder()
           .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create();
@@ -203,6 +217,12 @@ public class NotebookServer extends WebSocketServlet implements
           case CHECKPOINT_NOTEBOOK:
             checkpointNotebook(conn, notebook, messagereceived);
             break;
+          case LIST_NOTEBOOK_JOBS:
+            unicastNotebookJobInfo(conn);
+            break;
+          case LIST_UPDATE_NOTEBOOK_JOBS:
+            unicastUpdateNotebookJobInfo(conn, messagereceived);
+            break;
           default:
             break;
       }
@@ -350,6 +370,34 @@ public class NotebookServer extends WebSocketServlet implements
     }
   }
 
+  public void unicastNotebookJobInfo(NotebookSocket conn) throws IOException {
+
+    List<Map<String, Object>> notebookJobs = notebook().getJobListforNotebook(false, 0);
+    Map<String, Object> response = new HashMap<>();
+
+    response.put("lastResponseUnixTime", System.currentTimeMillis());
+    response.put("jobs", notebookJobs);
+
+    conn.send(serializeMessage(new Message(OP.LIST_NOTEBOOK_JOBS)
+      .put("notebookJobs", response)));
+  }
+
+  public void unicastUpdateNotebookJobInfo(NotebookSocket conn, Message fromMessage)
+      throws IOException {
+    double lastUpdateUnixTimeRaw = (double) fromMessage.get("lastUpdateUnixTime");
+    long lastUpdateUnixTime = new Double(lastUpdateUnixTimeRaw).longValue();
+
+    List<Map<String, Object>> notebookJobs;
+    notebookJobs = notebook().getJobListforNotebook(false, lastUpdateUnixTime);
+
+    Map<String, Object> response = new HashMap<>();
+    response.put("lastResponseUnixTime", System.currentTimeMillis());
+    response.put("jobs", notebookJobs);
+
+    conn.send(serializeMessage(new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS)
+            .put("notebookRunningJobs", response)));
+  }
+
   public List<Map<String, String>> generateNotebooksInfo(boolean needsReload) {
     Notebook notebook = notebook();
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/cfe677b8/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 4e61111..58a552d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -489,6 +489,138 @@ public class Notebook {
     this.jobListenerFactory = jobListenerFactory;
   }
 
+  private Map<String, Object> getParagraphForJobManagerItem(Paragraph paragraph) {
+    Map<String, Object> paragraphItem = new HashMap<>();
+
+    // set paragraph id
+    paragraphItem.put("id", paragraph.getId());
+
+    // set paragraph name
+    String paragraphName = paragraph.getTitle();
+    if (paragraphName != null) {
+      paragraphItem.put("name", paragraphName);
+    } else {
+      paragraphItem.put("name", paragraph.getId());
+    }
+
+    // set status for paragraph.
+    paragraphItem.put("status", paragraph.getStatus().toString());
+
+    return paragraphItem;
+  }
+
+  private long getUnixTimeLastRunParagraph(Paragraph paragraph) {
+
+    Date lastRunningDate = null;
+    long lastRunningUnixTime = 0;
+
+    Date paragaraphDate = paragraph.getDateStarted();
+    // diff started time <-> finishied time
+    if (paragaraphDate == null) {
+      paragaraphDate = paragraph.getDateFinished();
+    } else {
+      if (paragraph.getDateFinished() != null &&
+          paragraph.getDateFinished().after(paragaraphDate)) {
+        paragaraphDate = paragraph.getDateFinished();
+      }
+    }
+
+    // finished time and started time is not exists.
+    if (paragaraphDate == null) {
+      paragaraphDate = paragraph.getDateCreated();
+    }
+
+    // set last update unixtime(ms).
+    lastRunningDate = paragaraphDate;
+
+    lastRunningUnixTime = lastRunningDate.getTime();
+
+    return lastRunningUnixTime;
+  }
+
+  public List<Map<String, Object>> getJobListforNotebook(boolean needsReload,
+      long lastUpdateServerUnixTime) {
+    final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron";
+
+    if (needsReload) {
+      try {
+        reloadAllNotes();
+      } catch (IOException e) {
+        logger.error("Fail to reload notes from repository");
+      }
+    }
+
+    List<Note> notes = getAllNotes();
+    List<Map<String, Object>> notesInfo = new LinkedList<>();
+    for (Note note : notes) {
+      boolean isNotebookRunning = false;
+      boolean isUpdateNotebook = false;
+      long lastRunningUnixTime = 0;
+      Map<String, Object> info = new HashMap<>();
+
+      // set notebook ID
+      info.put("notebookId", note.id());
+
+      // set notebook Name
+      String notebookName = note.getName();
+      if (notebookName != null) {
+        info.put("notebookName", note.getName());
+      } else {
+        info.put("notebookName", "Note " + note.id());
+      }
+
+      // set notebook type ( cron or normal )
+      if (note.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) == true &&
+          !note.getConfig().get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) {
+        info.put("notebookType", "cron");
+      }
+      else {
+        info.put("notebookType", "normal");
+      }
+
+      // set paragraphs
+      List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
+      for (Paragraph paragraph : note.getParagraphs()) {
+        // check paragraph's status.
+        if (paragraph.getStatus().isRunning() == true) {
+          isNotebookRunning = true;
+          isUpdateNotebook = true;
+        }
+
+        // get data for the job manager.
+        Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph);
+        lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);
+
+        // is update notebook for last server update time.
+        if (lastRunningUnixTime > lastUpdateServerUnixTime) {
+          paragraphsInfo.add(paragraphItem);
+          isUpdateNotebook = true;
+        }
+      }
+
+      // set interpreter bind type
+      String interpreterGroupName = null;
+      if (note.getNoteReplLoader().getInterpreterSettings() != null &&
+          note.getNoteReplLoader().getInterpreterSettings().size() >= 1) {
+        interpreterGroupName = note.getNoteReplLoader().getInterpreterSettings().get(0).getGroup();
+      }
+
+      // not update and not running -> pass
+      if (isUpdateNotebook == false && isNotebookRunning == false) {
+        continue;
+      }
+
+      // notebook json object root information.
+      info.put("interpreter", interpreterGroupName);
+      info.put("isRunningJob", isNotebookRunning);
+      info.put("unixTimeLastRun", lastRunningUnixTime);
+      info.put("paragraphs", paragraphsInfo);
+      notesInfo.add(info);
+    }
+
+    return notesInfo;
+  }
+
   /**
    * Cron task for the note.
    */

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/cfe677b8/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
index a3fc048..320709e 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
@@ -111,10 +111,12 @@ public class Message {
     CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
                   // @param settings serialized Map<String, String> object
 
-    CHECKPOINT_NOTEBOOK     // [c-s] checkpoint notebook to storage repository
-                            // @param noteId
-                            // @param checkpointName
-
+    CHECKPOINT_NOTEBOOK,     // [c-s] checkpoint notebook to storage repository
+                             // @param noteId
+                             // @param checkpointName
+    LIST_NOTEBOOK_JOBS,     // [c-s] get notebook job management infomations
+    LIST_UPDATE_NOTEBOOK_JOBS // [c-s] get job management informations for until unixtime
+                               // @param unixTime
   }
 
   public OP op;