You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2020/02/28 18:55:19 UTC

[zeppelin] branch master updated: [ZEPPELIN-4619] Run a note from the commandline

This is an automated email from the ASF dual-hosted git repository.

moon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c7d51a  [ZEPPELIN-4619] Run a note from the commandline
7c7d51a is described below

commit 7c7d51a2d5876456fe1a7eba2f985454213e0e32
Author: Lee moon soo <mo...@apache.org>
AuthorDate: Thu Feb 27 17:34:45 2020 -0800

    [ZEPPELIN-4619] Run a note from the commandline
    
    ### What is this PR for?
    This PR adds a command-line option to run a single note.
    The code is pickled from https://github.com/apache/zeppelin/pull/3356.
    
    Usage is
    
    ```
    bin/zeppelin.sh --run <noteId>
    ```
    
    The command will launch a Zeppelin server and run a given note, and terminate on after all paragraph's successful run (with exit 0) or terminate on any paragraph error (with exit 1).
    
    ### What type of PR is it?
    Feature
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-4619
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: Lee moon soo <mo...@apache.org>
    
    This patch had conflicts when merged, resolved by
    Committer: Lee moon soo <mo...@apache.org>
    
    Closes #3654 from Leemoonsoo/ZEPPELIN-4619 and squashes the following commits:
    
    6a8316388 [Lee moon soo] remove useless test
    a7531a49f [Lee moon soo] address comment
    7743b43f6 [Lee moon soo] add log
    99974f89d [Lee moon soo] overload loading runAllParagraphs()
    101d1557b [Lee moon soo] add --run option
---
 bin/zeppelin.sh                                    | 34 +++++----
 .../zeppelin/conf/ZeppelinConfiguration.java       | 22 ++++++
 .../org/apache/zeppelin/server/ZeppelinServer.java | 87 +++++++++++++++++-----
 .../apache/zeppelin/service/NotebookService.java   | 33 ++++++--
 .../zeppelin/service/NotebookServiceTest.java      | 18 +++--
 5 files changed, 149 insertions(+), 45 deletions(-)

diff --git a/bin/zeppelin.sh b/bin/zeppelin.sh
index bf5aaba..9f32fb4 100755
--- a/bin/zeppelin.sh
+++ b/bin/zeppelin.sh
@@ -36,20 +36,26 @@ if [ -z "$uidentry" ] ; then
     fi
 fi
 
-USAGE="Usage: bin/zeppelin.sh [--config <conf-dir>]"
-
-if [[ "$1" == "--config" ]]; then
-  shift
-  conf_dir="$1"
-  if [[ ! -d "${conf_dir}" ]]; then
-    echo "ERROR : ${conf_dir} is not a directory"
-    echo ${USAGE}
-    exit 1
-  else
-    export ZEPPELIN_CONF_DIR="${conf_dir}"
-  fi
-  shift
-fi
+USAGE="Usage: bin/zeppelin.sh [--config <conf-dir>] [--run <noteId>]"
+
+POSITIONAL=()
+while [[ $# -gt 0 ]]
+do
+  key="$1"
+  case $key in
+    --config)
+    export ZEPPELIN_CONF_DIR="$2"
+    shift # past argument
+    shift # past value
+    ;;
+    --run)
+    export ZEPPELIN_NOTEBOOK_RUN_ID="$2"
+    shift # past argument
+    shift # past value
+    ;;
+  esac
+done
+set -- "${POSITIONAL[@]}" # restore positional parameters
 
 bin=$(dirname "${BASH_SOURCE-$0}")
 bin=$(cd "${bin}">/dev/null; pwd)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 78fdb86..43f9ec7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -397,6 +397,22 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
   }
 
+  public String getNotebookRunId() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_ID);
+  }
+
+  public String getNotebookRunRev() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_REV);
+  }
+
+  public String getNotebookRunServiceContext() {
+    return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_SERVICE_CONTEXT);
+  }
+
+  public boolean getNotebookRunAutoShutdown() {
+    return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_RUN_AUTOSHUTDOWN);
+  }
+
   public String getPluginsDir() {
     return getRelativeDir(getString(ConfVars.ZEPPELIN_PLUGINS_DIR));
   }
@@ -831,6 +847,12 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
     ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
+
+    ZEPPELIN_NOTEBOOK_RUN_ID("zeppelin.notebook.run.id", null),   // run particular note id on zeppelin start
+    ZEPPELIN_NOTEBOOK_RUN_REV("zeppelin.notebook.run.rev", null), // revision id for ZEPPELIN_NOTEBOOK_RUN_ID.
+    ZEPPELIN_NOTEBOOK_RUN_SERVICE_CONTEXT("zeppelin.notebook.run.servicecontext", null), // base64 encoded serialized service context to be used ZEPPELIN_NOTEBOOK_RUN_ID.
+    ZEPPELIN_NOTEBOOK_RUN_AUTOSHUTDOWN("zeppelin.notebook.run.autoshutdown", true), // after specified note (ZEPPELIN_NOTEBOOK_RUN_ID) run, shutdown zeppelin server
+
     ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
     ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class",
         "org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"),
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 258c930..0e5be41 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -16,9 +16,12 @@
  */
 package org.apache.zeppelin.server;
 
+import com.google.gson.Gson;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.util.Base64;
+import java.util.HashSet;
 import java.util.List;
 import java.util.EnumSet;
 import java.util.Objects;
@@ -30,6 +33,7 @@ import javax.servlet.DispatcherType;
 import javax.servlet.ServletContextEvent;
 import javax.servlet.ServletContextListener;
 import org.apache.commons.lang.StringUtils;
+import org.apache.directory.api.util.Strings;
 import org.apache.shiro.web.env.EnvironmentLoaderListener;
 import org.apache.shiro.web.servlet.ShiroFilter;
 import org.apache.zeppelin.cluster.ClusterManagerServer;
@@ -49,6 +53,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.notebook.NoteEventListener;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.AuthorizationService;
+import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
 import org.apache.zeppelin.notebook.scheduler.NoSchedulerService;
@@ -61,6 +66,7 @@ import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.service.*;
 import org.apache.zeppelin.service.AuthenticationService;
 import org.apache.zeppelin.socket.NotebookServer;
+import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
 import org.apache.zeppelin.util.ReflectionUtils;
 import org.eclipse.jetty.http.HttpVersion;
@@ -117,7 +123,7 @@ public class ZeppelinServer extends ResourceConfig {
     packages("org.apache.zeppelin.rest");
   }
 
-  public static void main(String[] args) throws InterruptedException {
+  public static void main(String[] args) throws InterruptedException, IOException {
     ZeppelinServer.conf = ZeppelinConfiguration.create();
     conf.setProperty("args", args);
 
@@ -254,23 +260,9 @@ public class ZeppelinServer extends ResourceConfig {
     }
     LOG.info("Done, zeppelin server started");
 
-    Runtime.getRuntime()
-        .addShutdownHook(
-            new Thread(
-                () -> {
-                  LOG.info("Shutting down Zeppelin Server ... ");
-                  try {
-                    jettyWebServer.stop();
-                    if (!conf.isRecoveryEnabled()) {
-                      sharedServiceLocator.getService(InterpreterSettingManager.class).close();
-                    }
-                    sharedServiceLocator.getService(Notebook.class).close();
-                    Thread.sleep(3000);
-                  } catch (Exception e) {
-                    LOG.error("Error while stopping servlet container", e);
-                  }
-                  LOG.info("Bye");
-                }));
+    runNoteOnStart(conf);
+
+    Runtime.getRuntime().addShutdownHook(shutdown(conf));
 
     // when zeppelin is started inside of ide (especially for eclipse)
     // for graceful shutdown, input any key in console window
@@ -289,6 +281,24 @@ public class ZeppelinServer extends ResourceConfig {
     }
   }
 
+  private static Thread shutdown(ZeppelinConfiguration conf) {
+    return new Thread(
+            () -> {
+              LOG.info("Shutting down Zeppelin Server ... ");
+              try {
+                jettyWebServer.stop();
+                if (!conf.isRecoveryEnabled()) {
+                  sharedServiceLocator.getService(InterpreterSettingManager.class).close();
+                }
+                sharedServiceLocator.getService(Notebook.class).close();
+                Thread.sleep(3000);
+              } catch (Exception e) {
+                LOG.error("Error while stopping servlet container", e);
+              }
+              LOG.info("Bye");
+            });
+  }
+
   private static Server setupJettyServer(ZeppelinConfiguration conf) {
     ThreadPool threadPool =
       new QueuedThreadPool(conf.getInt(ConfVars.ZEPPELIN_SERVER_JETTY_THREAD_POOL_MAX),
@@ -332,6 +342,47 @@ public class ZeppelinServer extends ResourceConfig {
     server.addConnector(connector);
   }
 
+  private static void runNoteOnStart(ZeppelinConfiguration conf) throws IOException, InterruptedException {
+    String noteIdToRun = conf.getNotebookRunId();
+    if (!Strings.isEmpty(noteIdToRun)) {
+      LOG.info("Running note {} on start", noteIdToRun);
+      NotebookService notebookService = (NotebookService) ServiceLocatorUtilities.getService(
+              sharedServiceLocator, NotebookService.class.getName());
+
+      ServiceContext serviceContext;
+      String base64EncodedJsonSerializedServiceContext = conf.getNotebookRunServiceContext();
+      if (Strings.isEmpty(base64EncodedJsonSerializedServiceContext)) {
+        LOG.info("No service context provided. use ANONYMOUS");
+        serviceContext = new ServiceContext(AuthenticationInfo.ANONYMOUS, new HashSet<String>() {});
+      } else {
+        serviceContext = new Gson().fromJson(
+                new String(Base64.getDecoder().decode(base64EncodedJsonSerializedServiceContext)),
+                ServiceContext.class);
+      }
+
+      boolean success = notebookService.runAllParagraphs(noteIdToRun, null, serviceContext, new ServiceCallback<Paragraph>() {
+        @Override
+        public void onStart(String message, ServiceContext context) throws IOException {
+        }
+
+        @Override
+        public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
+        }
+
+        @Override
+        public void onFailure(Exception ex, ServiceContext context) throws IOException {
+        }
+      });
+
+      if (conf.getNotebookRunAutoShutdown()) {
+        Thread t = shutdown(conf);
+        t.start();
+        t.join();
+        System.exit(success ? 0 : 1);
+      }
+    }
+  }
+
   private static void configureRequestHeaderSize(
       ZeppelinConfiguration conf, ServerConnector connector) {
     HttpConnectionFactory cf =
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index 7ed3331..99ed6bb 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -22,6 +22,8 @@ package org.apache.zeppelin.service;
 import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN;
 
 import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -30,6 +32,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.inject.Inject;
+
+import java.util.stream.Collectors;
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
@@ -82,6 +86,7 @@ public class NotebookService {
   private Notebook notebook;
   private AuthorizationService authorizationService;
   private SchedulerService schedulerService;
+  private Gson gson = new Gson();
 
   @Inject
   public NotebookService(
@@ -344,19 +349,35 @@ public class NotebookService {
     }
   }
 
-  public void runAllParagraphs(String noteId,
+  /**
+   * Run list of paragraphs. This method runs provided paragraphs one by one, synchronously.
+   * When a paragraph fails, subsequent paragraphs are not going to run and this method returns false.
+   * When list of paragraphs provided from argument is null, list of paragraphs stored in the Note will be used.
+   *
+   * @param noteId
+   * @param paragraphs list of paragraphs to run. List of paragraph stored in the Note will be used when null.
+   * @param context
+   * @param callback
+   * @return true when all paragraphs successfully run. false when any paragraph fails.
+   * @throws IOException
+   */
+  public boolean runAllParagraphs(String noteId,
                                List<Map<String, Object>> paragraphs,
                                ServiceContext context,
                                ServiceCallback<Paragraph> callback) throws IOException {
     if (!checkPermission(noteId, Permission.RUNNER, Message.OP.RUN_ALL_PARAGRAPHS, context,
         callback)) {
-      return;
+      return false;
     }
 
     Note note = notebook.getNote(noteId);
     if (note == null) {
       callback.onFailure(new NoteNotFoundException(noteId), context);
-      return;
+      return false;
+    }
+
+    if (paragraphs == null) {
+      paragraphs = gson.fromJson(gson.toJson(note.getParagraphs()), new TypeToken<List>(){}.getType());
     }
 
     note.setRunning(true);
@@ -366,7 +387,7 @@ public class NotebookService {
         if (paragraphId == null) {
           continue;
         }
-        String text = (String) raw.get("paragraph");
+        String text = (String) raw.get("text");
         String title = (String) raw.get("title");
         Map<String, Object> params = (Map<String, Object>) raw.get("params");
         Map<String, Object> config = (Map<String, Object>) raw.get("config");
@@ -374,12 +395,14 @@ public class NotebookService {
         if (!runParagraph(noteId, paragraphId, title, text, params, config, false, true,
                 context, callback)) {
           // stop execution when one paragraph fails.
-          break;
+          return false;
         }
       }
     } finally {
       note.setRunning(false);
     }
+
+    return true;
   }
 
   public void cancelParagraph(String noteId,
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
index f77edc2..e56b58a 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
@@ -361,20 +361,22 @@ public class NotebookServiceTest {
 
     // run all paragraphs
     reset(callback);
+    String textBefore = p.getText();
     notebookService.runAllParagraphs(
             note1.getId(),
             gson.fromJson(gson.toJson(note1.getParagraphs()), new TypeToken<List>(){}.getType()),
             context, callback);
     verify(callback, times(2)).onSuccess(any(), any());
+    assertEquals(textBefore, p.getText());
+
+    // run all paragraphs, with null paragraph list provided
+    reset(callback);
+    notebookService.runAllParagraphs(
+            note1.getId(),
+            null,
+            context, callback);
+    verify(callback, times(2)).onSuccess(any(), any());
 
-    // run paragraph synchronously via invalid code
-    //TODO(zjffdu) must sleep for a while, otherwise will get wrong status. This should be due to
-    //bug of job component.
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
     reset(callback);
     runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "invalid_code",
         new HashMap<>(), new HashMap<>(), false, true, context, callback);