You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2020/02/28 18:55:19 UTC
[zeppelin] branch master updated: [ZEPPELIN-4619] Run a note from
the commandline
This is an automated email from the ASF dual-hosted git repository.
moon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 7c7d51a [ZEPPELIN-4619] Run a note from the commandline
7c7d51a is described below
commit 7c7d51a2d5876456fe1a7eba2f985454213e0e32
Author: Lee moon soo <mo...@apache.org>
AuthorDate: Thu Feb 27 17:34:45 2020 -0800
[ZEPPELIN-4619] Run a note from the commandline
### What is this PR for?
This PR adds a command-line option to run a single note.
The code is pickled from https://github.com/apache/zeppelin/pull/3356.
Usage is
```
bin/zeppelin.sh --run <noteId>
```
The command will launch a Zeppelin server and run a given note, and terminate on after all paragraph's successful run (with exit 0) or terminate on any paragraph error (with exit 1).
### What type of PR is it?
Feature
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-4619
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Lee moon soo <mo...@apache.org>
This patch had conflicts when merged, resolved by
Committer: Lee moon soo <mo...@apache.org>
Closes #3654 from Leemoonsoo/ZEPPELIN-4619 and squashes the following commits:
6a8316388 [Lee moon soo] remove useless test
a7531a49f [Lee moon soo] address comment
7743b43f6 [Lee moon soo] add log
99974f89d [Lee moon soo] overload loading runAllParagraphs()
101d1557b [Lee moon soo] add --run option
---
bin/zeppelin.sh | 34 +++++----
.../zeppelin/conf/ZeppelinConfiguration.java | 22 ++++++
.../org/apache/zeppelin/server/ZeppelinServer.java | 87 +++++++++++++++++-----
.../apache/zeppelin/service/NotebookService.java | 33 ++++++--
.../zeppelin/service/NotebookServiceTest.java | 18 +++--
5 files changed, 149 insertions(+), 45 deletions(-)
diff --git a/bin/zeppelin.sh b/bin/zeppelin.sh
index bf5aaba..9f32fb4 100755
--- a/bin/zeppelin.sh
+++ b/bin/zeppelin.sh
@@ -36,20 +36,26 @@ if [ -z "$uidentry" ] ; then
fi
fi
-USAGE="Usage: bin/zeppelin.sh [--config <conf-dir>]"
-
-if [[ "$1" == "--config" ]]; then
- shift
- conf_dir="$1"
- if [[ ! -d "${conf_dir}" ]]; then
- echo "ERROR : ${conf_dir} is not a directory"
- echo ${USAGE}
- exit 1
- else
- export ZEPPELIN_CONF_DIR="${conf_dir}"
- fi
- shift
-fi
+USAGE="Usage: bin/zeppelin.sh [--config <conf-dir>] [--run <noteId>]"
+
+POSITIONAL=()
+while [[ $# -gt 0 ]]
+do
+ key="$1"
+ case $key in
+ --config)
+ export ZEPPELIN_CONF_DIR="$2"
+ shift # past argument
+ shift # past value
+ ;;
+ --run)
+ export ZEPPELIN_NOTEBOOK_RUN_ID="$2"
+ shift # past argument
+ shift # past value
+ ;;
+ esac
+done
+set -- "${POSITIONAL[@]}" # restore positional parameters
bin=$(dirname "${BASH_SOURCE-$0}")
bin=$(cd "${bin}">/dev/null; pwd)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 78fdb86..43f9ec7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -397,6 +397,22 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
}
+ public String getNotebookRunId() {
+ return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_ID);
+ }
+
+ public String getNotebookRunRev() {
+ return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_REV);
+ }
+
+ public String getNotebookRunServiceContext() {
+ return getString(ConfVars.ZEPPELIN_NOTEBOOK_RUN_SERVICE_CONTEXT);
+ }
+
+ public boolean getNotebookRunAutoShutdown() {
+ return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_RUN_AUTOSHUTDOWN);
+ }
+
public String getPluginsDir() {
return getRelativeDir(getString(ConfVars.ZEPPELIN_PLUGINS_DIR));
}
@@ -831,6 +847,12 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
+
+ ZEPPELIN_NOTEBOOK_RUN_ID("zeppelin.notebook.run.id", null), // run particular note id on zeppelin start
+ ZEPPELIN_NOTEBOOK_RUN_REV("zeppelin.notebook.run.rev", null), // revision id for ZEPPELIN_NOTEBOOK_RUN_ID.
+ ZEPPELIN_NOTEBOOK_RUN_SERVICE_CONTEXT("zeppelin.notebook.run.servicecontext", null), // base64 encoded serialized service context to be used ZEPPELIN_NOTEBOOK_RUN_ID.
+ ZEPPELIN_NOTEBOOK_RUN_AUTOSHUTDOWN("zeppelin.notebook.run.autoshutdown", true), // after specified note (ZEPPELIN_NOTEBOOK_RUN_ID) run, shutdown zeppelin server
+
ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class",
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"),
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 258c930..0e5be41 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -16,9 +16,12 @@
*/
package org.apache.zeppelin.server;
+import com.google.gson.Gson;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.util.Base64;
+import java.util.HashSet;
import java.util.List;
import java.util.EnumSet;
import java.util.Objects;
@@ -30,6 +33,7 @@ import javax.servlet.DispatcherType;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.apache.commons.lang.StringUtils;
+import org.apache.directory.api.util.Strings;
import org.apache.shiro.web.env.EnvironmentLoaderListener;
import org.apache.shiro.web.servlet.ShiroFilter;
import org.apache.zeppelin.cluster.ClusterManagerServer;
@@ -49,6 +53,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.NoteEventListener;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.AuthorizationService;
+import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.notebook.scheduler.NoSchedulerService;
@@ -61,6 +66,7 @@ import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.service.*;
import org.apache.zeppelin.service.AuthenticationService;
import org.apache.zeppelin.socket.NotebookServer;
+import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
import org.apache.zeppelin.util.ReflectionUtils;
import org.eclipse.jetty.http.HttpVersion;
@@ -117,7 +123,7 @@ public class ZeppelinServer extends ResourceConfig {
packages("org.apache.zeppelin.rest");
}
- public static void main(String[] args) throws InterruptedException {
+ public static void main(String[] args) throws InterruptedException, IOException {
ZeppelinServer.conf = ZeppelinConfiguration.create();
conf.setProperty("args", args);
@@ -254,23 +260,9 @@ public class ZeppelinServer extends ResourceConfig {
}
LOG.info("Done, zeppelin server started");
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread(
- () -> {
- LOG.info("Shutting down Zeppelin Server ... ");
- try {
- jettyWebServer.stop();
- if (!conf.isRecoveryEnabled()) {
- sharedServiceLocator.getService(InterpreterSettingManager.class).close();
- }
- sharedServiceLocator.getService(Notebook.class).close();
- Thread.sleep(3000);
- } catch (Exception e) {
- LOG.error("Error while stopping servlet container", e);
- }
- LOG.info("Bye");
- }));
+ runNoteOnStart(conf);
+
+ Runtime.getRuntime().addShutdownHook(shutdown(conf));
// when zeppelin is started inside of ide (especially for eclipse)
// for graceful shutdown, input any key in console window
@@ -289,6 +281,24 @@ public class ZeppelinServer extends ResourceConfig {
}
}
+ private static Thread shutdown(ZeppelinConfiguration conf) {
+ return new Thread(
+ () -> {
+ LOG.info("Shutting down Zeppelin Server ... ");
+ try {
+ jettyWebServer.stop();
+ if (!conf.isRecoveryEnabled()) {
+ sharedServiceLocator.getService(InterpreterSettingManager.class).close();
+ }
+ sharedServiceLocator.getService(Notebook.class).close();
+ Thread.sleep(3000);
+ } catch (Exception e) {
+ LOG.error("Error while stopping servlet container", e);
+ }
+ LOG.info("Bye");
+ });
+ }
+
private static Server setupJettyServer(ZeppelinConfiguration conf) {
ThreadPool threadPool =
new QueuedThreadPool(conf.getInt(ConfVars.ZEPPELIN_SERVER_JETTY_THREAD_POOL_MAX),
@@ -332,6 +342,47 @@ public class ZeppelinServer extends ResourceConfig {
server.addConnector(connector);
}
+ private static void runNoteOnStart(ZeppelinConfiguration conf) throws IOException, InterruptedException {
+ String noteIdToRun = conf.getNotebookRunId();
+ if (!Strings.isEmpty(noteIdToRun)) {
+ LOG.info("Running note {} on start", noteIdToRun);
+ NotebookService notebookService = (NotebookService) ServiceLocatorUtilities.getService(
+ sharedServiceLocator, NotebookService.class.getName());
+
+ ServiceContext serviceContext;
+ String base64EncodedJsonSerializedServiceContext = conf.getNotebookRunServiceContext();
+ if (Strings.isEmpty(base64EncodedJsonSerializedServiceContext)) {
+ LOG.info("No service context provided. use ANONYMOUS");
+ serviceContext = new ServiceContext(AuthenticationInfo.ANONYMOUS, new HashSet<String>() {});
+ } else {
+ serviceContext = new Gson().fromJson(
+ new String(Base64.getDecoder().decode(base64EncodedJsonSerializedServiceContext)),
+ ServiceContext.class);
+ }
+
+ boolean success = notebookService.runAllParagraphs(noteIdToRun, null, serviceContext, new ServiceCallback<Paragraph>() {
+ @Override
+ public void onStart(String message, ServiceContext context) throws IOException {
+ }
+
+ @Override
+ public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
+ }
+
+ @Override
+ public void onFailure(Exception ex, ServiceContext context) throws IOException {
+ }
+ });
+
+ if (conf.getNotebookRunAutoShutdown()) {
+ Thread t = shutdown(conf);
+ t.start();
+ t.join();
+ System.exit(success ? 0 : 1);
+ }
+ }
+ }
+
private static void configureRequestHeaderSize(
ZeppelinConfiguration conf, ServerConnector connector) {
HttpConnectionFactory cf =
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index 7ed3331..99ed6bb 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -22,6 +22,8 @@ package org.apache.zeppelin.service;
import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN;
import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -30,6 +32,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
+
+import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
@@ -82,6 +86,7 @@ public class NotebookService {
private Notebook notebook;
private AuthorizationService authorizationService;
private SchedulerService schedulerService;
+ private Gson gson = new Gson();
@Inject
public NotebookService(
@@ -344,19 +349,35 @@ public class NotebookService {
}
}
- public void runAllParagraphs(String noteId,
+ /**
+ * Run list of paragraphs. This method runs provided paragraphs one by one, synchronously.
+ * When a paragraph fails, subsequent paragraphs are not going to run and this method returns false.
+ * When list of paragraphs provided from argument is null, list of paragraphs stored in the Note will be used.
+ *
+ * @param noteId
+ * @param paragraphs list of paragraphs to run. List of paragraph stored in the Note will be used when null.
+ * @param context
+ * @param callback
+ * @return true when all paragraphs successfully run. false when any paragraph fails.
+ * @throws IOException
+ */
+ public boolean runAllParagraphs(String noteId,
List<Map<String, Object>> paragraphs,
ServiceContext context,
ServiceCallback<Paragraph> callback) throws IOException {
if (!checkPermission(noteId, Permission.RUNNER, Message.OP.RUN_ALL_PARAGRAPHS, context,
callback)) {
- return;
+ return false;
}
Note note = notebook.getNote(noteId);
if (note == null) {
callback.onFailure(new NoteNotFoundException(noteId), context);
- return;
+ return false;
+ }
+
+ if (paragraphs == null) {
+ paragraphs = gson.fromJson(gson.toJson(note.getParagraphs()), new TypeToken<List>(){}.getType());
}
note.setRunning(true);
@@ -366,7 +387,7 @@ public class NotebookService {
if (paragraphId == null) {
continue;
}
- String text = (String) raw.get("paragraph");
+ String text = (String) raw.get("text");
String title = (String) raw.get("title");
Map<String, Object> params = (Map<String, Object>) raw.get("params");
Map<String, Object> config = (Map<String, Object>) raw.get("config");
@@ -374,12 +395,14 @@ public class NotebookService {
if (!runParagraph(noteId, paragraphId, title, text, params, config, false, true,
context, callback)) {
// stop execution when one paragraph fails.
- break;
+ return false;
}
}
} finally {
note.setRunning(false);
}
+
+ return true;
}
public void cancelParagraph(String noteId,
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
index f77edc2..e56b58a 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
@@ -361,20 +361,22 @@ public class NotebookServiceTest {
// run all paragraphs
reset(callback);
+ String textBefore = p.getText();
notebookService.runAllParagraphs(
note1.getId(),
gson.fromJson(gson.toJson(note1.getParagraphs()), new TypeToken<List>(){}.getType()),
context, callback);
verify(callback, times(2)).onSuccess(any(), any());
+ assertEquals(textBefore, p.getText());
+
+ // run all paragraphs, with null paragraph list provided
+ reset(callback);
+ notebookService.runAllParagraphs(
+ note1.getId(),
+ null,
+ context, callback);
+ verify(callback, times(2)).onSuccess(any(), any());
- // run paragraph synchronously via invalid code
- //TODO(zjffdu) must sleep for a while, otherwise will get wrong status. This should be due to
- //bug of job component.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
reset(callback);
runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "invalid_code",
new HashMap<>(), new HashMap<>(), false, true, context, callback);