You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/09 03:12:08 UTC
[2/2] zeppelin git commit: ZEPPELIN-3569. Improvement of
FlinkInterpreter
ZEPPELIN-3569. Improvement of FlinkInterpreter
### What is this PR for?
This PR just refactor the flink interpreter and also introduce several main features.
Here's the main changes;
1. Upgrade flink to 1.5.2
2. Support ZeppelinContext
3. Support %flink.sql
4. Support yarn mode
### What type of PR is it?
[Improvement | Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3569
### How should this be tested?
* Unit test is added
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3054 from zjffdu/ZEPPELIN-3569 and squashes the following commits:
a256a95fd [Jeff Zhang] ZEPPELIN-3569. Improvement of FlinkInterpreter
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1c5b38a9
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1c5b38a9
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1c5b38a9
Branch: refs/heads/master
Commit: 1c5b38a9afb211b4681a9fe99f9740add76d0f37
Parents: 8e5013c
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Jun 5 14:28:21 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Aug 9 11:12:00 2018 +0800
----------------------------------------------------------------------
.travis.yml | 2 +-
bin/interpreter.sh | 15 +
flink/pom.xml | 45 +-
.../apache/zeppelin/flink/FlinkInterpreter.java | 420 +++----------------
.../zeppelin/flink/FlinkSQLInterpreter.java | 72 ++++
.../src/main/resources/interpreter-setting.json | 19 +
.../flink/FlinkSQLScalaInterpreter.scala | 42 ++
.../zeppelin/flink/FlinkScalaInterpreter.scala | 230 ++++++++++
.../zeppelin/flink/FlinkZeppelinContext.scala | 177 ++++++++
.../zeppelin/flink/FlinkInterpreterTest.java | 263 +++++++++---
.../zeppelin/flink/FlinkSQLInterpreterTest.java | 110 +++++
flink/src/test/resources/flink-conf.yaml | 247 +++++++++++
flink/src/test/resources/log4j.properties | 24 ++
.../launcher/StandardInterpreterLauncher.java | 6 +
zeppelin-server/pom.xml | 8 -
.../interpreter/SparkDownloadUtils.java | 136 ++++++
.../interpreter/remote/RemoteInterpreter.java | 4 +-
.../interpreter/FlinkIntegrationTest.java | 116 +++++
.../interpreter/SparkDownloadUtils.java | 110 -----
.../src/test/resources/flink-conf.yaml | 0
20 files changed, 1486 insertions(+), 560 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 5249ef2..5ddbaa5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -41,7 +41,7 @@ addons:
env:
global:
# Interpreters does not required by zeppelin-server integration tests
- - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!r,!java'
+ - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!r,!java'
matrix:
include:
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 48cf0f6..c895018 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -201,6 +201,21 @@ elif [[ "${INTERPRETER_ID}" == "pig" ]]; then
else
echo "TEZ_CONF_DIR is not set, configuration might not be loaded"
fi
+elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
+ if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
+ ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
+ export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
+ else
+ # autodetect HADOOP_CONF_HOME by heuristic
+ if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
+ if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
+ export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
+ elif [[ -d "/etc/hadoop/conf" ]]; then
+ export HADOOP_CONF_DIR="/etc/hadoop/conf"
+ fi
+ fi
+ fi
+
fi
addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 70c076d..217813b 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -27,7 +27,7 @@
</parent>
<groupId>org.apache.zeppelin</groupId>
- <artifactId>zeppelin-flink_2.10</artifactId>
+ <artifactId>zeppelin-flink</artifactId>
<packaging>jar</packaging>
<version>0.9.0-SNAPSHOT</version>
<name>Zeppelin: Flink</name>
@@ -36,15 +36,18 @@
<properties>
<!--library versions-->
<interpreter.name>flink</interpreter.name>
- <flink.version>1.1.3</flink.version>
+ <flink.version>1.5.2</flink.version>
<flink.akka.version>2.3.7</flink.akka.version>
<scala.macros.version>2.0.1</scala.macros.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
<!--plugin versions-->
<plugin.scalamaven.version>3.2.2</plugin.scalamaven.version>
<plugin.eclipse.version>2.8</plugin.eclipse.version>
<plugin.buildhelper.version>1.7</plugin.buildhelper.version>
<plugin.scalastyle.version>0.5.0</plugin.scalastyle.version>
+
</properties>
<dependencies>
@@ -90,38 +93,32 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
+ <artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor_${scala.binary.version}</artifactId>
- <version>${flink.akka.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-remote_${scala.binary.version}</artifactId>
- <version>${flink.akka.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
- <version>${flink.akka.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-testkit_${scala.binary.version}</artifactId>
- <version>${flink.akka.version}</version>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>20.0</version>
</dependency>
<dependency>
@@ -282,6 +279,16 @@
</plugin>
<plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>always</forkMode>
+ <environmentVariables>
+ <FLINK_CONF_DIR>${project.build.directory}/test-classes</FLINK_CONF_DIR>
+ </environmentVariables>
+ </configuration>
+ </plugin>
+
+ <plugin>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 9d66437..c14407d 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,403 +14,90 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.flink;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.scala.FlinkILoop;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import scala.Console;
-import scala.Some;
-import scala.collection.JavaConversions;
-import scala.concurrent.duration.FiniteDuration;
-import scala.runtime.AbstractFunction0;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
+package org.apache.zeppelin.flink;
+import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-/**
- * Interpreter for Apache Flink (http://flink.apache.org).
- */
-public class FlinkInterpreter extends Interpreter {
- Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class);
- private ByteArrayOutputStream out;
- private Configuration flinkConf;
- private LocalFlinkMiniCluster localFlinkCluster;
- private FlinkILoop flinkIloop;
- private Map<String, Object> binder;
- private IMain imain;
-
- public FlinkInterpreter(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- out = new ByteArrayOutputStream();
- flinkConf = new org.apache.flink.configuration.Configuration();
- Properties intpProperty = getProperties();
- for (Object k : intpProperty.keySet()) {
- String key = (String) k;
- String val = toString(intpProperty.get(key));
- flinkConf.setString(key, val);
- }
-
- if (localMode()) {
- startFlinkMiniCluster();
- }
-
- String[] externalJars = new String[0];
- String localRepo = getProperty("zeppelin.interpreter.localRepo");
- if (localRepo != null) {
- File localRepoDir = new File(localRepo);
- if (localRepoDir.exists()) {
- File[] files = localRepoDir.listFiles();
- if (files != null) {
- externalJars = new String[files.length];
- for (int i = 0; i < files.length; i++) {
- if (externalJars.length > 0) {
- externalJars[i] = files[i].getAbsolutePath();
- }
- }
- }
- }
- }
-
- flinkIloop = new FlinkILoop(getHost(),
- getPort(),
- flinkConf,
- new Some<>(externalJars),
- (BufferedReader) null,
- new PrintWriter(out));
-
- flinkIloop.settings_$eq(createSettings());
- flinkIloop.createInterpreter();
-
- imain = flinkIloop.intp();
-
- org.apache.flink.api.scala.ExecutionEnvironment benv =
- flinkIloop.scalaBenv();
- org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv =
- flinkIloop.scalaSenv();
-
- senv.getConfig().disableSysoutLogging();
- benv.getConfig().disableSysoutLogging();
-
- // prepare bindings
- imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
- Map<String, Object> binder = (Map<String, Object>) getLastObject();
-
- // import libraries
- imain.interpret("import scala.tools.nsc.io._");
- imain.interpret("import Properties.userHome");
- imain.interpret("import scala.compat.Platform.EOL");
-
- imain.interpret("import org.apache.flink.api.scala._");
- imain.interpret("import org.apache.flink.api.common.functions._");
-
-
- binder.put("benv", benv);
- imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf["
- + benv.getClass().getName() + "]");
-
- binder.put("senv", senv);
- imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf["
- + senv.getClass().getName() + "]");
-
- }
-
- private boolean localMode() {
- String host = getProperty("host");
- return host == null || host.trim().length() == 0 || host.trim().equals("local");
- }
-
- private String getHost() {
- if (localMode()) {
- return "localhost";
- } else {
- return getProperty("host");
- }
- }
-
- private int getPort() {
- if (localMode()) {
- return localFlinkCluster.getLeaderRPCPort();
- } else {
- return Integer.parseInt(getProperty("port"));
- }
- }
-
- private Settings createSettings() {
- URL[] urls = getClassloaderUrls();
- Settings settings = new Settings();
-
- // set classpath
- PathSetting pathSettings = settings.classpath();
- String classpath = "";
- List<File> paths = currentClassPath();
- for (File f : paths) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += f.getAbsolutePath();
- }
-
- if (urls != null) {
- for (URL u : urls) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += u.getFile();
- }
- }
-
- pathSettings.v_$eq(classpath);
- settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
- settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
- .getContextClassLoader()));
- BooleanSetting b = (BooleanSetting) settings.usejavacp();
- b.v_$eq(true);
- settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
- // To prevent 'File name too long' error on some file system.
- MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
- numClassFileSetting.v_$eq(128);
- settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq(
- numClassFileSetting);
-
- return settings;
- }
-
-
- private List<File> currentClassPath() {
- List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
- String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
- if (cps != null) {
- for (String cp : cps) {
- paths.add(new File(cp));
- }
- }
- return paths;
- }
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
- private List<File> classPath(ClassLoader cl) {
- List<File> paths = new LinkedList<>();
- if (cl == null) {
- return paths;
- }
+public class FlinkInterpreter extends Interpreter {
- if (cl instanceof URLClassLoader) {
- URLClassLoader ucl = (URLClassLoader) cl;
- URL[] urls = ucl.getURLs();
- if (urls != null) {
- for (URL url : urls) {
- paths.add(new File(url.getFile()));
- }
- }
- }
- return paths;
- }
+ private FlinkScalaInterpreter innerIntp;
+ private FlinkZeppelinContext z;
- public Object getLastObject() {
- Object obj = imain.lastRequest().lineRep().call(
- "$result",
- JavaConversions.asScalaBuffer(new LinkedList<>()));
- return obj;
+ public FlinkInterpreter(Properties properties) {
+ super(properties);
+ this.innerIntp = new FlinkScalaInterpreter(getProperties());
}
@Override
- public void close() {
- flinkIloop.closeInterpreter();
+ public void open() throws InterpreterException {
+ this.innerIntp.open();
- if (localMode()) {
- stopFlinkMiniCluster();
- }
+ // bind ZeppelinContext
+ int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000"));
+ this.z = new FlinkZeppelinContext(innerIntp.getBatchTableEnviroment(),
+ getInterpreterGroup().getInterpreterHookRegistry(), maxRow);
+ List<String> modifiers = new ArrayList<>();
+ modifiers.add("@transient");
+ this.innerIntp.bind("z", z.getClass().getCanonicalName(), z, modifiers);
}
@Override
- public InterpreterResult interpret(String line, InterpreterContext context) {
- if (line == null || line.trim().length() == 0) {
- return new InterpreterResult(Code.SUCCESS);
- }
-
- InterpreterResult result = interpret(line.split("\n"), context);
- return result;
+ public void close() throws InterpreterException {
+ this.innerIntp.close();
}
- public InterpreterResult interpret(String[] lines, InterpreterContext context) {
- final IMain imain = flinkIloop.intp();
-
- String[] linesToRun = new String[lines.length + 1];
- for (int i = 0; i < lines.length; i++) {
- linesToRun[i] = lines[i];
- }
- linesToRun[lines.length] = "print(\"\")";
-
- System.setOut(new PrintStream(out));
- out.reset();
- Code r = null;
-
- String incomplete = "";
- boolean inComment = false;
-
- for (int l = 0; l < linesToRun.length; l++) {
- final String s = linesToRun[l];
- // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
- if (l + 1 < linesToRun.length) {
- String nextLine = linesToRun[l + 1].trim();
- boolean continuation = false;
- if (nextLine.isEmpty()
- || nextLine.startsWith("//") // skip empty line or comment
- || nextLine.startsWith("}")
- || nextLine.startsWith("object")) { // include "} object" for Scala companion object
- continuation = true;
- } else if (!inComment && nextLine.startsWith("/*")) {
- inComment = true;
- continuation = true;
- } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
- inComment = false;
- continuation = true;
- } else if (nextLine.length() > 1
- && nextLine.charAt(0) == '.'
- && nextLine.charAt(1) != '.' // ".."
- && nextLine.charAt(1) != '/') { // "./"
- continuation = true;
- } else if (inComment) {
- continuation = true;
- }
- if (continuation) {
- incomplete += s + "\n";
- continue;
- }
- }
-
- final String currentCommand = incomplete;
-
- scala.tools.nsc.interpreter.Results.Result res = null;
- try {
- res = Console.withOut(
- System.out,
- new AbstractFunction0<Results.Result>() {
- @Override
- public Results.Result apply() {
- return imain.interpret(currentCommand + s);
- }
- });
- } catch (Exception e) {
- logger.info("Interpreter exception", e);
- return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
- }
-
- r = getResultCode(res);
-
- if (r == Code.ERROR) {
- return new InterpreterResult(r, out.toString());
- } else if (r == Code.INCOMPLETE) {
- incomplete += s + "\n";
- } else {
- incomplete = "";
- }
- }
-
- if (r == Code.INCOMPLETE) {
- return new InterpreterResult(r, "Incomplete expression");
- } else {
- return new InterpreterResult(r, out.toString());
- }
- }
-
- private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
- if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
- return Code.SUCCESS;
- } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
- return Code.INCOMPLETE;
- } else {
- return Code.ERROR;
- }
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context)
+ throws InterpreterException {
+ this.z.setInterpreterContext(context);
+ this.z.setGui(context.getGui());
+ this.z.setNoteGui(context.getNoteGui());
+ return innerIntp.interpret(st, context);
}
@Override
- public void cancel(InterpreterContext context) {
- if (localMode()) {
- // In localMode we can cancel all running jobs,
- // because the local cluster can only run one job at the time.
- for (JobID job : this.localFlinkCluster.getCurrentlyRunningJobsJava()) {
- logger.info("Stop job: " + job);
- cancelJobLocalMode(job);
- }
- }
- }
+ public void cancel(InterpreterContext context) throws InterpreterException {
- private void cancelJobLocalMode(JobID jobID){
- FiniteDuration timeout = AkkaUtils.getTimeout(this.localFlinkCluster.configuration());
- ActorGateway leader = this.localFlinkCluster.getLeaderGateway(timeout);
- leader.ask(new JobManagerMessages.CancelJob(jobID), timeout);
}
@Override
- public FormType getFormType() {
+ public FormType getFormType() throws InterpreterException {
return FormType.NATIVE;
}
@Override
- public int getProgress(InterpreterContext context) {
+ public int getProgress(InterpreterContext context) throws InterpreterException {
return 0;
}
@Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return new LinkedList<>();
+ public List<InterpreterCompletion> completion(String buf,
+ int cursor,
+ InterpreterContext interpreterContext)
+ throws InterpreterException {
+ return innerIntp.completion(buf, cursor, interpreterContext);
}
- private void startFlinkMiniCluster() {
- localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false);
-
- try {
- localFlinkCluster.start(true);
- } catch (Exception e){
- throw new RuntimeException("Could not start Flink mini cluster.", e);
- }
+ FlinkScalaInterpreter getInnerScalaInterpreter() {
+ return this.innerIntp;
}
- private void stopFlinkMiniCluster() {
- if (localFlinkCluster != null) {
- localFlinkCluster.stop();
- localFlinkCluster = null;
- }
+ ExecutionEnvironment getExecutionEnviroment() {
+ return this.innerIntp.getExecutionEnviroment();
}
- static final String toString(Object o) {
- return (o instanceof String) ? (String) o : "";
+ FlinkZeppelinContext getZeppelinContext() {
+ return this.z;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java
new file mode 100644
index 0000000..1ac3547
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+
+import java.util.Properties;
+
+public class FlinkSQLInterpreter extends Interpreter {
+
+ private FlinkSQLScalaInterpreter sqlScalaInterpreter;
+
+ public FlinkSQLInterpreter(Properties properties) {
+ super(properties);
+ }
+
+
+ @Override
+ public void open() throws InterpreterException {
+ FlinkInterpreter flinkInterpreter =
+ getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+ FlinkZeppelinContext z = flinkInterpreter.getZeppelinContext();
+ int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000"));
+ this.sqlScalaInterpreter = new FlinkSQLScalaInterpreter(
+ flinkInterpreter.getInnerScalaInterpreter(), z, maxRow);
+ }
+
+ @Override
+ public void close() throws InterpreterException {
+
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context)
+ throws InterpreterException {
+ return sqlScalaInterpreter.interpret(st, context);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) throws InterpreterException {
+
+ }
+
+ @Override
+ public FormType getFormType() throws InterpreterException {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) throws InterpreterException {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/flink/src/main/resources/interpreter-setting.json b/flink/src/main/resources/interpreter-setting.json
index f1a04bf..1463e3d 100644
--- a/flink/src/main/resources/interpreter-setting.json
+++ b/flink/src/main/resources/interpreter-setting.json
@@ -23,5 +23,24 @@
"language": "scala",
"editOnDblClick": false
}
+ },
+
+ {
+ "group": "flink",
+ "name": "sql",
+ "className": "org.apache.zeppelin.flink.FlinkSQLInterpreter",
+ "properties": {
+ "zeppelin.flink.maxResult": {
+ "envName": "zeppelin.flink.maxResult",
+ "propertyName": "zeppelin.flink.maxResult",
+ "defaultValue": "1000",
+ "description": "max number of row returned by sql interpreter.",
+ "type": "number"
+ }
+ },
+ "editor": {
+ "language": "sql",
+ "editOnDblClick": false
+ }
}
]
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
new file mode 100644
index 0000000..1694a44
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+
+class FlinkSQLScalaInterpreter(scalaInterpreter: FlinkScalaInterpreter,
+ z: FlinkZeppelinContext,
+ maxRow: Int) {
+
+ private var btenv: BatchTableEnvironment = scalaInterpreter.getBatchTableEnviroment()
+
+ def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+ try {
+ val table: Table = this.btenv.sql(code)
+ val result = z.showData(table)
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, result)
+ } catch {
+ case e: Exception =>
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ "Fail to fetch result: " + e.getMessage)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
new file mode 100644
index 0000000..0653c2a
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import java.io.BufferedReader
+import java.nio.file.Files
+import java.util.Properties
+
+import org.apache.flink.api.scala.FlinkShell._
+import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop}
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.configuration.GlobalConfiguration
+import org.apache.flink.runtime.minicluster.{MiniCluster, StandaloneMiniCluster}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.Completion.ScalaCompleter
+import scala.tools.nsc.interpreter.{JPrintWriter, SimpleReader}
+
+class FlinkScalaInterpreter(val properties: Properties) {
+
+ lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+
+ private var flinkILoop: FlinkILoop = _
+ private var cluster: Option[Either[Either[StandaloneMiniCluster, MiniCluster],
+ ClusterClient[_]]] = _
+ private var scalaCompleter: ScalaCompleter = _
+ private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+
+ private var benv: ExecutionEnvironment = _
+ private var senv: StreamExecutionEnvironment = _
+ private var btenv: BatchTableEnvironment = _
+ private var stenv: StreamTableEnvironment = _
+ private var z: FlinkZeppelinContext = _
+
+ def open(): Unit = {
+ var config = Config(executionMode = ExecutionMode.withName(
+ properties.getProperty("flink.execution.mode", "LOCAL").toUpperCase))
+ val containerNum = Integer.parseInt(properties.getProperty("flink.yarn.num_container", "1"))
+ config = config.copy(yarnConfig =
+ Some(ensureYarnConfig(config).copy(containers = Some(containerNum))))
+ val configuration = GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR"))
+ val replOut = new JPrintWriter(interpreterOutput, true)
+
+ val (iLoop, cluster) = try {
+ val (host, port, cluster) = fetchConnectionInfo(configuration, config)
+ val conf = cluster match {
+ case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
+ case Some(Left(Right(_))) => configuration
+ case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
+ case None => configuration
+ }
+ LOGGER.info(s"\nConnecting to Flink cluster (host: $host, port: $port).\n")
+ val repl = new FlinkILoop(host, port, conf, config.externalJars, None, replOut)
+
+ (repl, cluster)
+ } catch {
+ case e: IllegalArgumentException =>
+ println(s"Error: ${e.getMessage}")
+ sys.exit()
+ }
+
+ this.flinkILoop = iLoop
+ this.cluster = cluster
+ val settings = new Settings()
+ settings.usejavacp.value = true
+ settings.Yreplsync.value = true
+
+ val outputDir = Files.createTempDirectory("flink-repl");
+ val interpArguments = List(
+ "-Yrepl-class-based",
+ "-Yrepl-outdir", s"${outputDir.toFile.getAbsolutePath}"
+ )
+ settings.processArguments(interpArguments, true)
+
+ flinkILoop.settings = settings
+ flinkILoop.createInterpreter()
+
+ val in0 = getField(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$in0")
+ .asInstanceOf[Option[BufferedReader]]
+ val reader = in0.fold(flinkILoop.chooseReader(settings))(r =>
+ SimpleReader(r, replOut, interactive = true))
+
+ flinkILoop.in = reader
+ flinkILoop.initializeSynchronous()
+ callMethod(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$loopPostInit")
+ this.scalaCompleter = reader.completion.completer()
+
+ this.benv = flinkILoop.scalaBenv
+ this.senv = flinkILoop.scalaSenv
+ this.btenv = TableEnvironment.getTableEnvironment(this.benv)
+ this.stenv = TableEnvironment.getTableEnvironment(this.senv)
+ bind("btenv", btenv.getClass.getCanonicalName, btenv, List("@transient"))
+ bind("stenv", stenv.getClass.getCanonicalName, stenv, List("@transient"))
+
+ if (java.lang.Boolean.parseBoolean(
+ properties.getProperty("zeppelin.flink.disableSysoutLogging", "true"))) {
+ this.benv.getConfig.disableSysoutLogging()
+ this.senv.getConfig.disableSysoutLogging()
+ }
+ }
+
+ // for use in java side
+ protected def bind(name: String,
+ tpe: String,
+ value: Object,
+ modifier: java.util.List[String]): Unit = {
+ flinkILoop.beQuietDuring {
+ flinkILoop.bind(name, tpe, value, modifier.asScala.toList)
+ }
+ }
+
+ protected def bind(name: String,
+ tpe: String,
+ value: Object,
+ modifier: List[String]): Unit = {
+ flinkILoop.beQuietDuring {
+ flinkILoop.bind(name, tpe, value, modifier)
+ }
+ }
+
+ protected def completion(buf: String,
+ cursor: Int,
+ context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+ val completions = scalaCompleter.complete(buf, cursor).candidates
+ .map(e => new InterpreterCompletion(e, e, null))
+ scala.collection.JavaConversions.seqAsJavaList(completions)
+ }
+
+ protected def callMethod(obj: Object, name: String): Object = {
+ callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object])
+ }
+
+ protected def callMethod(obj: Object, name: String,
+ parameterTypes: Array[Class[_]],
+ parameters: Array[Object]): Object = {
+ val method = obj.getClass.getMethod(name, parameterTypes: _ *)
+ method.setAccessible(true)
+ method.invoke(obj, parameters: _ *)
+ }
+
+
+ protected def getField(obj: Object, name: String): Object = {
+ val field = obj.getClass.getField(name)
+ field.setAccessible(true)
+ field.get(obj)
+ }
+
+ def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+
+ val originalOut = System.out
+
+ def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = {
+ Console.withOut(interpreterOutput) {
+ System.setOut(Console.out)
+ interpreterOutput.setInterpreterOutput(context.out)
+ interpreterOutput.ignoreLeadingNewLinesFromScalaReporter()
+ context.out.clear()
+
+ val status = flinkILoop.interpret(code) match {
+ case scala.tools.nsc.interpreter.IR.Success =>
+ scala.tools.nsc.interpreter.IR.Success
+ case scala.tools.nsc.interpreter.IR.Error =>
+ scala.tools.nsc.interpreter.IR.Error
+ case scala.tools.nsc.interpreter.IR.Incomplete =>
+ // add print("") at the end in case the last line is comment which lead to INCOMPLETE
+ flinkILoop.interpret(code + "\nprint(\"\")")
+ }
+ context.out.flush()
+ status
+ }
+ }
+ // reset the java stdout
+ System.setOut(originalOut)
+
+ val lastStatus = _interpret(code) match {
+ case scala.tools.nsc.interpreter.IR.Success =>
+ InterpreterResult.Code.SUCCESS
+ case scala.tools.nsc.interpreter.IR.Error =>
+ InterpreterResult.Code.ERROR
+ case scala.tools.nsc.interpreter.IR.Incomplete =>
+ InterpreterResult.Code.INCOMPLETE
+ }
+ new InterpreterResult(lastStatus)
+ }
+
+ def close(): Unit = {
+ if (flinkILoop != null) {
+ flinkILoop.close()
+ }
+ if (cluster != null) {
+ cluster match {
+ case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
+ case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
+ case Some(Right(yarnCluster)) => yarnCluster.shutdown()
+ case _ =>
+ }
+ }
+ }
+
+ def getExecutionEnviroment(): ExecutionEnvironment = this.benv
+
+ def getStreamingExecutionEnviroment(): StreamExecutionEnvironment = this.senv
+
+ def getBatchTableEnviroment(): BatchTableEnvironment = this.btenv
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
new file mode 100644
index 0000000..5246445
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import java.util
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.flink.types.Row
+import org.apache.zeppelin.annotation.ZeppelinApi
+import org.apache.zeppelin.display.AngularObjectWatcher
+import org.apache.zeppelin.display.ui.OptionInput.ParamOption
+import org.apache.zeppelin.interpreter.{BaseZeppelinContext, InterpreterContext,
+ InterpreterHookRegistry}
+
+import scala.collection.{JavaConversions, Seq}
+
+
+/**
+ * ZeppelinContext for Flink
+ */
+class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
+ val hooks2: InterpreterHookRegistry,
+ val maxResult2: Int) extends BaseZeppelinContext(hooks2, maxResult2) {
+
+ private val interpreterClassMap = Map(
+ "flink" -> "org.apache.zeppelin.flink.FlinkInterpreter",
+ "sql" -> "org.apache.zeppelin.flink.FlinkSqlInterpreter"
+ )
+
+ private val supportedClasses = Seq(classOf[DataSet[_]])
+
+ override def getSupportedClasses: util.List[Class[_]] =
+ JavaConversions.seqAsJavaList(supportedClasses)
+
+ override def getInterpreterClassMap: util.Map[String, String] =
+ JavaConversions.mapAsJavaMap(interpreterClassMap)
+
+ override def showData(obj: Any): String = {
+ def showTable(table: Table): String = {
+ val columnNames: Array[String] = table.getSchema.getColumnNames
+ val dsRow: DataSet[Row] = btenv.toDataSet[Row](table)
+ val builder: StringBuilder = new StringBuilder("%table ")
+ builder.append(columnNames.mkString("\t"))
+ builder.append("\n")
+ val rows = dsRow.first(maxResult).collect()
+ for (row <- rows) {
+ var i = 0;
+ while (i < row.getArity) {
+ builder.append(row.getField(i))
+ i += 1
+ if (i != row.getArity) {
+ builder.append("\t");
+ }
+ }
+ builder.append("\n")
+ }
+ // append %text at the end, otherwise the following output will be put in table as well.
+ builder.append("\n%text ")
+ builder.toString()
+ }
+
+ if (obj.isInstanceOf[DataSet[_]]) {
+ val ds = obj.asInstanceOf[DataSet[_]]
+ val table = btenv.fromDataSet(ds)
+ showTable(table)
+ } else if (obj.isInstanceOf[Table]) {
+ showTable(obj.asInstanceOf[Table])
+ } else {
+ obj.toString
+ }
+ }
+
+
+ @ZeppelinApi
+ def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)
+
+ @ZeppelinApi
+ def select(name: String, defaultValue: Any, options: Seq[(Any, String)]): Any =
+ select(name, defaultValue, options.map(e => new ParamOption(e._1, e._2)).toArray)
+
+ @ZeppelinApi
+ def checkbox(name: String, options: Seq[(AnyRef, String)]): Seq[Any] = {
+ val javaResult = checkbox(name, JavaConversions.seqAsJavaList(options.map(e => e._1)),
+ options.map(e => new ParamOption(e._1, e._2)).toArray)
+ JavaConversions.asScalaBuffer(javaResult)
+ }
+
+ @ZeppelinApi
+ def checkbox(name: String, defaultChecked: Seq[AnyRef], options: Seq[(Any, String)]): Seq[Any] = {
+ val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked)
+ val javaResult = checkbox(name, defaultCheckedList,
+ options.map(e => new ParamOption(e._1, e._2)).toArray)
+ JavaConversions.asScalaBuffer(javaResult)
+ }
+
+ @ZeppelinApi
+ def noteSelect(name: String, options: Seq[(Any, String)]): Any = noteSelect(name, "", options)
+
+ @ZeppelinApi
+ def noteSelect(name: String, defaultValue: Any, options: Seq[(Any, String)]): AnyRef =
+ noteSelect(name, defaultValue, options.map(e => new ParamOption(e._1, e._2)).toArray)
+
+ @ZeppelinApi
+ def noteCheckbox(name: String, options: Seq[(AnyRef, String)]): Seq[AnyRef] = {
+ val javaResulst = noteCheckbox(name, JavaConversions.seqAsJavaList(options.map(e => e._1)),
+ options.map(e => new ParamOption(e._1, e._2)).toArray)
+ JavaConversions.asScalaBuffer(javaResulst)
+ }
+
+ @ZeppelinApi
+ def noteCheckbox(name: String,
+ defaultChecked: Seq[AnyRef],
+ options: Seq[(AnyRef, String)]): Seq[AnyRef] = {
+ val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked)
+ val javaResult = noteCheckbox(name, defaultCheckedList,
+ options.map(e => new ParamOption(e._1, e._2)).toArray)
+ JavaConversions.asScalaBuffer(javaResult)
+ }
+
+ @ZeppelinApi def angularWatch(name: String, func: (AnyRef, AnyRef) => Unit): Unit = {
+ angularWatch(name, interpreterContext.getNoteId, func)
+ }
+
+ @deprecated def angularWatchGlobal(name: String, func: (AnyRef, AnyRef) => Unit): Unit = {
+ angularWatch(name, null, func)
+ }
+
+ @ZeppelinApi def angularWatch(name: String,
+ func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = {
+ angularWatch(name, interpreterContext.getNoteId, func)
+ }
+
+ @deprecated def angularWatchGlobal(name: String,
+ func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = {
+ angularWatch(name, null, func)
+ }
+
+ private def angularWatch(name: String, noteId: String, func: (AnyRef, AnyRef) => Unit): Unit = {
+ val w = new AngularObjectWatcher(getInterpreterContext) {
+ override def watch(oldObject: Any, newObject: AnyRef, context: InterpreterContext): Unit = {
+ func(newObject, newObject)
+ }
+ }
+ angularWatch(name, noteId, w)
+ }
+
+ private def angularWatch(name: String, noteId: String,
+ func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = {
+ val w = new AngularObjectWatcher(getInterpreterContext) {
+ override def watch(oldObject: AnyRef,
+ newObject: AnyRef,
+ context: InterpreterContext): Unit = {
+ func(oldObject, newObject, context)
+ }
+ }
+ angularWatch(name, noteId, w)
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 128f567..0c42139 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,84 +16,240 @@
*/
package org.apache.zeppelin.flink;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.ui.CheckBox;
+import org.apache.zeppelin.display.ui.Select;
+import org.apache.zeppelin.display.ui.TextBox;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class FlinkInterpreterTest {
- private static FlinkInterpreter flink;
- private static InterpreterContext context;
+ private FlinkInterpreter interpreter;
+ private InterpreterContext context;
+
+ // catch the streaming output in onAppend
+ private volatile String output = "";
+ // catch the interpreter output in onUpdate
+ private List<InterpreterResultMessageOutput> messageOutput;
- @BeforeClass
- public static void setUp() {
+ @Before
+ public void setUp() throws InterpreterException {
Properties p = new Properties();
- flink = new FlinkInterpreter(p);
- flink.open();
+ interpreter = new FlinkInterpreter(p);
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ interpreter.setInterpreterGroup(intpGroup);
+ interpreter.open();
context = InterpreterContext.builder().build();
+ InterpreterContext.set(context);
}
- @AfterClass
- public static void tearDown() {
- flink.close();
- }
-
- @Test
- public void testNextLineInvocation() {
- assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret("\"123\"\n.toInt", context)
- .code());
- }
-
- @Test
- public void testNextLineComments() {
- assertEquals(InterpreterResult.Code.SUCCESS,
- flink.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
+ @After
+ public void tearDown() throws InterpreterException {
+ interpreter.close();
}
@Test
- public void testNextLineCompanionObject() {
- String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter " +
- "{\n def apply(x: Long) = new Counter()\n}";
- assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret(code, context).code());
+ public void testBasicScala() throws InterpreterException, IOException {
+ InterpreterResult result = interpreter.interpret("val a=\"hello world\"",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("a: String = hello world\n", output);
+
+ result = interpreter.interpret("print(a)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("hello world", output);
+
+ // java stdout
+ result = interpreter.interpret("System.out.print(a)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("hello world", output);
+
+ // incomplete
+ result = interpreter.interpret("println(a", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.INCOMPLETE, result.code());
+
+ // syntax error
+ result = interpreter.interpret("println(b)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(output.contains("not found: value b"));
+
+ // multiple line
+ result = interpreter.interpret("\"123\".\ntoInt", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // single line comment
+ result = interpreter.interpret("/*comment here*/", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // multiple line comment
+ result = interpreter.interpret("/*line 1 \n line 2*/",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // test function
+ result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("print(add(1,2))", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // companion object
+ result = interpreter.interpret("class Counter {\n " +
+ "var value: Long = 0} \n" +
+ "object Counter {\n def apply(x: Long) = new Counter()\n}", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // case class
+ result = interpreter.interpret(
+ "case class Bank(age:Integer, job:String, marital : String, education : String," +
+ " balance : Integer)\n",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // ZeppelinContext
+ context = getInterpreterContext();
+ result = interpreter.interpret("val ds = benv.fromElements(1,2,3)\nz.show(ds)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, messageOutput.get(0).getType());
+ assertEquals("f0\n" +
+ "1\n" +
+ "2\n" +
+ "3\n", messageOutput.get(0).toInterpreterResultMessage().getData());
+
+ context = getInterpreterContext();
+ result = interpreter.interpret("z.input(\"name\", \"default_name\")",
+ context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, context.getGui().getForms().size());
+ assertTrue(context.getGui().getForms().get("name") instanceof TextBox);
+ TextBox textBox = (TextBox) context.getGui().getForms().get("name");
+ assertEquals("name", textBox.getName());
+ assertEquals("default_name", textBox.getDefaultValue());
+
+ context = getInterpreterContext();
+ result = interpreter.interpret("z.checkbox(\"checkbox_1\", " +
+ "Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, context.getGui().getForms().size());
+ assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox);
+ CheckBox checkBox = (CheckBox) context.getGui().getForms().get("checkbox_1");
+ assertEquals("checkbox_1", checkBox.getName());
+ assertEquals(1, checkBox.getDefaultValue().length);
+ assertEquals("value_2", checkBox.getDefaultValue()[0]);
+ assertEquals(2, checkBox.getOptions().length);
+ assertEquals("value_1", checkBox.getOptions()[0].getValue());
+ assertEquals("name_1", checkBox.getOptions()[0].getDisplayName());
+ assertEquals("value_2", checkBox.getOptions()[1].getValue());
+ assertEquals("name_2", checkBox.getOptions()[1].getDisplayName());
+
+ context = getInterpreterContext();
+ result = interpreter.interpret("z.select(\"select_1\", Seq(\"value_2\"), " +
+ "Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, context.getGui().getForms().size());
+ assertTrue(context.getGui().getForms().get("select_1") instanceof Select);
+ Select select = (Select) context.getGui().getForms().get("select_1");
+ assertEquals("select_1", select.getName());
+ // TODO(zjffdu) it seems a bug of GUI, the default value should be 'value_2',
+ // but it is List(value_2)
+ // assertEquals("value_2", select.getDefaultValue());
+ assertEquals(2, select.getOptions().length);
+ assertEquals("value_1", select.getOptions()[0].getValue());
+ assertEquals("name_1", select.getOptions()[0].getDisplayName());
+ assertEquals("value_2", select.getOptions()[1].getValue());
+ assertEquals("name_2", select.getOptions()[1].getDisplayName());
}
@Test
- public void testSimpleStatement() {
- InterpreterResult result = flink.interpret("val a=1", context);
- result = flink.interpret("print(a)", context);
- assertEquals("1", result.message().get(0).getData());
+ public void testCompletion() throws InterpreterException {
+ InterpreterResult result = interpreter.interpret("val a=\"hello world\"",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("a: String = hello world\n", output);
+
+ List<InterpreterCompletion> completions = interpreter.completion("a.", 2,
+ getInterpreterContext());
+ assertTrue(completions.size() > 0);
}
- @Test
- public void testSimpleStatementWithSystemOutput() {
- InterpreterResult result = flink.interpret("val a=1", context);
- result = flink.interpret("System.out.print(a)", context);
- assertEquals("1", result.message().get(0).getData());
- }
+ // Disable it for now as there's extra std output from flink shell.
@Test
- public void testWordCount() {
- flink.interpret("val text = benv.fromElements(\"To be or not to be\")", context);
- flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }" +
- ".map { (_, 1) }.groupBy(0).sum(1)", context);
- InterpreterResult result = flink.interpret("counts.print()", context);
- assertEquals(Code.SUCCESS, result.code());
+ public void testWordCount() throws InterpreterException, IOException {
+ interpreter.interpret("val text = benv.fromElements(\"To be or not to be\")",
+ getInterpreterContext());
+ interpreter.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }" +
+ ".map { (_, 1) }.groupBy(0).sum(1)", getInterpreterContext());
+ InterpreterResult result = interpreter.interpret("counts.print()", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
String[] expectedCounts = {"(to,2)", "(be,2)", "(or,1)", "(not,1)"};
Arrays.sort(expectedCounts);
- String[] counts = result.message().get(0).getData().split("\n");
+ String[] counts = output.split("\n");
Arrays.sort(counts);
assertArrayEquals(expectedCounts, counts);
}
+
+ private InterpreterContext getInterpreterContext() {
+ output = "";
+ messageOutput = new ArrayList<>();
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
+ .build();
+ context.out = new InterpreterOutput(
+ new InterpreterOutputListener() {
+ @Override
+ public void onUpdateAll(InterpreterOutput out) {
+
+ }
+
+ @Override
+ public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+ try {
+ output = out.toInterpreterResultMessage().getData();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onUpdate(int index, InterpreterResultMessageOutput out) {
+ messageOutput.add(out);
+ }
+ });
+ return context;
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java
new file mode 100644
index 0000000..6993540
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class FlinkSQLInterpreterTest {
+
+ private FlinkInterpreter interpreter;
+ private FlinkSQLInterpreter sqlInterpreter;
+ private InterpreterContext context;
+
+ // catch the streaming output in onAppend
+ private volatile String output = "";
+ // catch the interpreter output in onUpdate
+ private InterpreterResultMessageOutput messageOutput;
+
+ @Before
+ public void setUp() throws InterpreterException {
+ Properties p = new Properties();
+ interpreter = new FlinkInterpreter(p);
+ sqlInterpreter = new FlinkSQLInterpreter(p);
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ interpreter.setInterpreterGroup(intpGroup);
+ sqlInterpreter.setInterpreterGroup(intpGroup);
+ intpGroup.addInterpreterToSession(interpreter, "session_1");
+ intpGroup.addInterpreterToSession(sqlInterpreter, "session_1");
+
+ interpreter.open();
+ sqlInterpreter.open();
+ context = InterpreterContext.builder().build();
+ }
+
+ @Test
+ public void testSQLInterpreter() throws InterpreterException {
+ InterpreterResult result = interpreter.interpret(
+ "val ds = benv.fromElements((1, \"jeff\"), (2, \"andy\"))", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("btenv.registerDataSet(\"table_1\", ds)",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = sqlInterpreter.interpret("select * from table_1", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ assertEquals("_1\t_2\n" +
+ "1\tjeff\n" +
+ "2\tandy\n", result.message().get(0).getData());
+ }
+
+ private InterpreterContext getInterpreterContext() {
+ output = "";
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
+ .build();
+ context.out = new InterpreterOutput(
+ new InterpreterOutputListener() {
+ @Override
+ public void onUpdateAll(InterpreterOutput out) {
+
+ }
+
+ @Override
+ public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+ try {
+ output = out.toInterpreterResultMessage().getData();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onUpdate(int index, InterpreterResultMessageOutput out) {
+ messageOutput = out;
+ }
+ });
+ return context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink/src/test/resources/flink-conf.yaml b/flink/src/test/resources/flink-conf.yaml
new file mode 100644
index 0000000..1041d0f
--- /dev/null
+++ b/flink/src/test/resources/flink-conf.yaml
@@ -0,0 +1,247 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
+
+#mode: legacy
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+
+# The heap size for the JobManager JVM
+
+jobmanager.heap.mb: 1024
+
+
+# The heap size for the TaskManager JVM
+
+taskmanager.heap.mb: 1024
+
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+#
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper stores
+# the small ground truth for checkpoint and leader election, this location stores
+# the larger objects, like persisted dataflow graphs.
+#
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...)
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled.
+#
+# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
+# <class-name-of-factory>.
+#
+# state.backend: filesystem
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend).
+#
+# state.backend.incremental: false
+
+#==============================================================================
+# Web Frontend
+#==============================================================================
+
+# The address under which the web-based runtime monitor listens.
+#
+#jobmanager.web.address: 0.0.0.0
+
+# The port under which the web-based runtime monitor listens.
+# A value of -1 deactivates the web server.
+
+rest.port: 8081
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#jobmanager.web.submit.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn or Mesos, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+# /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# Specify whether TaskManager's managed memory should be allocated when starting
+# up (true) or when memory is requested.
+#
+# We recommend to set this value to 'true' only in setups for pure batch
+# processing (DataSet API). Streaming setups currently do not use the TaskManager's
+# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
+# while the 'memory' and 'filesystem' backends explicitly keep data as objects
+# to save on serialization cost.
+#
+# taskmanager.memory.preallocate: false
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, teh default max is 1GB.
+#
+# taskmanager.network.memory.fraction: 0.1
+# taskmanager.network.memory.min: 67108864
+# taskmanager.network.memory.max: 1073741824
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in "security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+#historyserver.web.port: 8082
+
+# Comma separated list of directories to monitor for completed jobs.
+#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+
+# Interval in milliseconds for refreshing the monitored directories.
+#historyserver.archive.fs.refresh-interval: 10000
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink/src/test/resources/log4j.properties b/flink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..65b6d36
--- /dev/null
+++ b/flink/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
+
+log4j.logger.org.apache.zeppelin.flink=WARN
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index 10ab354..9c7a0b2 100644
--- a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -94,6 +94,12 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
if (RemoteInterpreterUtils.isEnvString((String) key)) {
env.put((String) key, context.getProperties().getProperty((String) key));
}
+ // TODO(zjffdu) move this to FlinkInterpreterLauncher
+ if (key.toString().equals("FLINK_HOME")) {
+ String flinkHome = context.getProperties().get(key).toString();
+ env.put("FLINK_CONF_DIR", flinkHome + "/conf");
+ env.put("FLINK_LIB_DIR", flinkHome + "/lib");
+ }
}
return env;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index 9bc2a4c..4eaedb2 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -368,14 +368,6 @@
</dependency>
<dependency>
- <groupId>org.apache.zeppelin</groupId>
- <artifactId>zeppelin-zengine</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.bitbucket.cowwoc</groupId>
<artifactId>diff-match-patch</artifactId>
<version>1.1</version>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
new file mode 100644
index 0000000..157b989
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
@@ -0,0 +1,136 @@
+package org.apache.zeppelin.interpreter;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for downloading spark. This is used for spark integration test.
+ *
+ */
+public class SparkDownloadUtils {
+ private static Logger LOGGER = LoggerFactory.getLogger(SparkDownloadUtils.class);
+
+ private static String downloadFolder = System.getProperty("user.home") + "/.cache/spark";
+
+ static {
+ try {
+ FileUtils.forceMkdir(new File(downloadFolder));
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to create downloadFolder: " + downloadFolder, e);
+ }
+ }
+
+
+ public static String downloadSpark(String version) {
+ File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6");
+ if (targetSparkHomeFolder.exists()) {
+ LOGGER.info("Skip to download spark as it is already downloaded.");
+ return targetSparkHomeFolder.getAbsolutePath();
+ }
+ // Try mirrors a few times until one succeeds
+ boolean downloaded = false;
+ for (int i = 0; i < 3; i++) {
+ try {
+ String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"));
+ File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz");
+ String downloadURL = preferredMirror + "/spark/spark-" + version + "/spark-" + version + "-bin-hadoop2.6.tgz";
+ runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
+ runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
+ downloaded = true;
+ break;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to download Spark", e);
+ }
+ }
+ // fallback to use apache archive
+ // https://archive.apache.org/dist/spark/spark-1.6.3/spark-1.6.3-bin-hadoop2.6.tgz
+ if (!downloaded) {
+ File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz");
+ String downloadURL =
+ "https://archive.apache.org/dist/spark/spark-"
+ + version
+ + "/spark-"
+ + version
+ + "-bin-hadoop2.6.tgz";
+ try {
+ runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
+ runShellCommand(
+ new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to download spark " + version, e);
+ }
+ }
+ return targetSparkHomeFolder.getAbsolutePath();
+ }
+
+ public static String downloadFlink(String version) {
+ File targetFlinkHomeFolder = new File(downloadFolder + "/flink-" + version);
+ if (targetFlinkHomeFolder.exists()) {
+ LOGGER.info("Skip to download flink as it is already downloaded.");
+ return targetFlinkHomeFolder.getAbsolutePath();
+ }
+ // Try mirrors a few times until one succeeds
+ for (int i = 0; i < 3; i++) {
+ try {
+ String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"));
+ File downloadFile = new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz");
+ String downloadURL = preferredMirror + "/flink/flink-" + version + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz";
+ runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
+ runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
+ break;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to download Flink", e);
+ }
+ }
+ return targetFlinkHomeFolder.getAbsolutePath();
+ }
+
+ private static void runShellCommand(String[] commands) throws IOException, InterruptedException {
+ LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " "));
+ Process process = Runtime.getRuntime().exec(commands);
+ StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream());
+ StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream());
+ errorGobbler.start();
+ outputGobbler.start();
+ if (process.waitFor() != 0) {
+ throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " "));
+ }
+ LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " "));
+ }
+
+ private static class StreamGobbler extends Thread {
+ InputStream is;
+
+ // reads everything from is until empty.
+ StreamGobbler(InputStream is) {
+ this.is = is;
+ }
+
+ public void run() {
+ try {
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ String line = null;
+ long startTime = System.currentTimeMillis();
+ while ( (line = br.readLine()) != null) {
+ // logging per 5 seconds
+ if ((System.currentTimeMillis() - startTime) > 5000) {
+ LOGGER.info(line);
+ startTime = System.currentTimeMillis();
+ }
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index b64b15b..6f9f81f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -225,7 +225,9 @@ public class RemoteInterpreter extends Interpreter {
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
}.getType());
context.getConfig().clear();
- context.getConfig().putAll(remoteConfig);
+ if (remoteConfig != null) {
+ context.getConfig().putAll(remoteConfig);
+ }
GUI currentGUI = context.getGui();
GUI currentNoteGUI = context.getNoteGui();
if (form == FormType.NATIVE) {