You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/09 03:12:08 UTC

[2/2] zeppelin git commit: ZEPPELIN-3569. Improvement of FlinkInterpreter

ZEPPELIN-3569. Improvement of FlinkInterpreter

### What is this PR for?
This PR just refactor the flink interpreter and also introduce several main features.

Here's the main changes;

1. Upgrade flink to 1.5.2
2. Support ZeppelinContext
3. Support %flink.sql
4. Support yarn mode

### What type of PR is it?
[Improvement | Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3569

### How should this be tested?
* Unit test is added

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #3054 from zjffdu/ZEPPELIN-3569 and squashes the following commits:

a256a95fd [Jeff Zhang] ZEPPELIN-3569. Improvement of FlinkInterpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1c5b38a9
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1c5b38a9
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1c5b38a9

Branch: refs/heads/master
Commit: 1c5b38a9afb211b4681a9fe99f9740add76d0f37
Parents: 8e5013c
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Jun 5 14:28:21 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Aug 9 11:12:00 2018 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 bin/interpreter.sh                              |  15 +
 flink/pom.xml                                   |  45 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java | 420 +++----------------
 .../zeppelin/flink/FlinkSQLInterpreter.java     |  72 ++++
 .../src/main/resources/interpreter-setting.json |  19 +
 .../flink/FlinkSQLScalaInterpreter.scala        |  42 ++
 .../zeppelin/flink/FlinkScalaInterpreter.scala  | 230 ++++++++++
 .../zeppelin/flink/FlinkZeppelinContext.scala   | 177 ++++++++
 .../zeppelin/flink/FlinkInterpreterTest.java    | 263 +++++++++---
 .../zeppelin/flink/FlinkSQLInterpreterTest.java | 110 +++++
 flink/src/test/resources/flink-conf.yaml        | 247 +++++++++++
 flink/src/test/resources/log4j.properties       |  24 ++
 .../launcher/StandardInterpreterLauncher.java   |   6 +
 zeppelin-server/pom.xml                         |   8 -
 .../interpreter/SparkDownloadUtils.java         | 136 ++++++
 .../interpreter/remote/RemoteInterpreter.java   |   4 +-
 .../interpreter/FlinkIntegrationTest.java       | 116 +++++
 .../interpreter/SparkDownloadUtils.java         | 110 -----
 .../src/test/resources/flink-conf.yaml          |   0
 20 files changed, 1486 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 5249ef2..5ddbaa5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -41,7 +41,7 @@ addons:
 env:
   global:
     # Interpreters does not required by zeppelin-server integration tests
-    - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!r,!java'
+    - INTERPRETERS='!beam,!hbase,!pig,!jdbc,!file,!ignite,!kylin,!lens,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!r,!java'
 
 matrix:
   include:

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 48cf0f6..c895018 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -201,6 +201,21 @@ elif [[ "${INTERPRETER_ID}" == "pig" ]]; then
   else
     echo "TEZ_CONF_DIR is not set, configuration might not be loaded"
   fi
+elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
+  if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
+    ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
+    export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
+  else
+    # autodetect HADOOP_CONF_HOME by heuristic
+    if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
+      if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
+        export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
+      elif [[ -d "/etc/hadoop/conf" ]]; then
+        export HADOOP_CONF_DIR="/etc/hadoop/conf"
+      fi
+    fi
+  fi
+
 fi
 
 addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 70c076d..217813b 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -27,7 +27,7 @@
   </parent>
 
   <groupId>org.apache.zeppelin</groupId>
-  <artifactId>zeppelin-flink_2.10</artifactId>
+  <artifactId>zeppelin-flink</artifactId>
   <packaging>jar</packaging>
   <version>0.9.0-SNAPSHOT</version>
   <name>Zeppelin: Flink</name>
@@ -36,15 +36,18 @@
   <properties>
     <!--library versions-->
     <interpreter.name>flink</interpreter.name>
-    <flink.version>1.1.3</flink.version>
+    <flink.version>1.5.2</flink.version>
     <flink.akka.version>2.3.7</flink.akka.version>
     <scala.macros.version>2.0.1</scala.macros.version>
+    <scala.binary.version>2.11</scala.binary.version>
+    <scala.version>2.11.8</scala.version>
 
     <!--plugin versions-->
     <plugin.scalamaven.version>3.2.2</plugin.scalamaven.version>
     <plugin.eclipse.version>2.8</plugin.eclipse.version>
     <plugin.buildhelper.version>1.7</plugin.buildhelper.version>
     <plugin.scalastyle.version>0.5.0</plugin.scalastyle.version>
+
   </properties>
 
   <dependencies>
@@ -90,38 +93,32 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_${scala.binary.version}</artifactId>
+      <artifactId>flink-yarn_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
+      <artifactId>flink-table_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-actor_${scala.binary.version}</artifactId>
-      <version>${flink.akka.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-remote_${scala.binary.version}</artifactId>
-      <version>${flink.akka.version}</version>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-scala_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
-      <version>${flink.akka.version}</version>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-testkit_${scala.binary.version}</artifactId>
-      <version>${flink.akka.version}</version>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>20.0</version>
     </dependency>
 
     <dependency>
@@ -282,6 +279,16 @@
       </plugin>
 
       <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>always</forkMode>
+          <environmentVariables>
+            <FLINK_CONF_DIR>${project.build.directory}/test-classes</FLINK_CONF_DIR>
+          </environmentVariables>
+        </configuration>
+      </plugin>
+
+      <plugin>
         <artifactId>maven-enforcer-plugin</artifactId>
       </plugin>
       <plugin>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 9d66437..c14407d 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,403 +14,90 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zeppelin.flink;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.scala.FlinkILoop;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 
-import scala.Console;
-import scala.Some;
-import scala.collection.JavaConversions;
-import scala.concurrent.duration.FiniteDuration;
-import scala.runtime.AbstractFunction0;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
+package org.apache.zeppelin.flink;
 
+import org.apache.flink.api.scala.ExecutionEnvironment;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 
-/**
- * Interpreter for Apache Flink (http://flink.apache.org).
- */
-public class FlinkInterpreter extends Interpreter {
-  Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class);
-  private ByteArrayOutputStream out;
-  private Configuration flinkConf;
-  private LocalFlinkMiniCluster localFlinkCluster;
-  private FlinkILoop flinkIloop;
-  private Map<String, Object> binder;
-  private IMain imain;
-
-  public FlinkInterpreter(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    out = new ByteArrayOutputStream();
-    flinkConf = new org.apache.flink.configuration.Configuration();
-    Properties intpProperty = getProperties();
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      String val = toString(intpProperty.get(key));
-      flinkConf.setString(key, val);
-    }
-
-    if (localMode()) {
-      startFlinkMiniCluster();
-    }
-
-    String[] externalJars = new String[0];
-    String localRepo = getProperty("zeppelin.interpreter.localRepo");
-    if (localRepo != null) {
-      File localRepoDir = new File(localRepo);
-      if (localRepoDir.exists()) {
-        File[] files = localRepoDir.listFiles();
-        if (files != null) {
-          externalJars = new String[files.length];
-          for (int i = 0; i < files.length; i++) {
-            if (externalJars.length > 0) {
-              externalJars[i] = files[i].getAbsolutePath();
-            }
-          }
-        }
-      }
-    }
-
-    flinkIloop = new FlinkILoop(getHost(),
-        getPort(),
-        flinkConf,
-        new Some<>(externalJars),
-        (BufferedReader) null,
-        new PrintWriter(out));
-
-    flinkIloop.settings_$eq(createSettings());
-    flinkIloop.createInterpreter();
-
-    imain = flinkIloop.intp();
-
-    org.apache.flink.api.scala.ExecutionEnvironment benv =
-            flinkIloop.scalaBenv();
-    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv =
-            flinkIloop.scalaSenv();
-
-    senv.getConfig().disableSysoutLogging();
-    benv.getConfig().disableSysoutLogging();
-
-    // prepare bindings
-    imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
-    Map<String, Object> binder = (Map<String, Object>) getLastObject();
-
-    // import libraries
-    imain.interpret("import scala.tools.nsc.io._");
-    imain.interpret("import Properties.userHome");
-    imain.interpret("import scala.compat.Platform.EOL");
-
-    imain.interpret("import org.apache.flink.api.scala._");
-    imain.interpret("import org.apache.flink.api.common.functions._");
-
-
-    binder.put("benv", benv);
-    imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf["
-            + benv.getClass().getName() + "]");
-
-    binder.put("senv", senv);
-    imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf["
-            + senv.getClass().getName() + "]");
-
-  }
-
-  private boolean localMode() {
-    String host = getProperty("host");
-    return host == null || host.trim().length() == 0 || host.trim().equals("local");
-  }
-
-  private String getHost() {
-    if (localMode()) {
-      return "localhost";
-    } else {
-      return getProperty("host");
-    }
-  }
-
-  private int getPort() {
-    if (localMode()) {
-      return localFlinkCluster.getLeaderRPCPort();
-    } else {
-      return Integer.parseInt(getProperty("port"));
-    }
-  }
-
-  private Settings createSettings() {
-    URL[] urls = getClassloaderUrls();
-    Settings settings = new Settings();
-
-    // set classpath
-    PathSetting pathSettings = settings.classpath();
-    String classpath = "";
-    List<File> paths = currentClassPath();
-    for (File f : paths) {
-      if (classpath.length() > 0) {
-        classpath += File.pathSeparator;
-      }
-      classpath += f.getAbsolutePath();
-    }
-
-    if (urls != null) {
-      for (URL u : urls) {
-        if (classpath.length() > 0) {
-          classpath += File.pathSeparator;
-        }
-        classpath += u.getFile();
-      }
-    }
-
-    pathSettings.v_$eq(classpath);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-    settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
-        .getContextClassLoader()));
-    BooleanSetting b = (BooleanSetting) settings.usejavacp();
-    b.v_$eq(true);
-    settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
-    // To prevent 'File name too long' error on some file system.
-    MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
-    numClassFileSetting.v_$eq(128);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq(
-            numClassFileSetting);
-
-    return settings;
-  }
-  
-
-  private List<File> currentClassPath() {
-    List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
-    String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
-    if (cps != null) {
-      for (String cp : cps) {
-        paths.add(new File(cp));
-      }
-    }
-    return paths;
-  }
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
 
-  private List<File> classPath(ClassLoader cl) {
-    List<File> paths = new LinkedList<>();
-    if (cl == null) {
-      return paths;
-    }
+public class FlinkInterpreter extends Interpreter {
 
-    if (cl instanceof URLClassLoader) {
-      URLClassLoader ucl = (URLClassLoader) cl;
-      URL[] urls = ucl.getURLs();
-      if (urls != null) {
-        for (URL url : urls) {
-          paths.add(new File(url.getFile()));
-        }
-      }
-    }
-    return paths;
-  }
+  private FlinkScalaInterpreter innerIntp;
+  private FlinkZeppelinContext z;
 
-  public Object getLastObject() {
-    Object obj = imain.lastRequest().lineRep().call(
-        "$result",
-        JavaConversions.asScalaBuffer(new LinkedList<>()));
-    return obj;
+  public FlinkInterpreter(Properties properties) {
+    super(properties);
+    this.innerIntp = new FlinkScalaInterpreter(getProperties());
   }
 
   @Override
-  public void close() {
-    flinkIloop.closeInterpreter();
+  public void open() throws InterpreterException {
+    this.innerIntp.open();
 
-    if (localMode()) {
-      stopFlinkMiniCluster();
-    }
+    // bind ZeppelinContext
+    int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000"));
+    this.z = new FlinkZeppelinContext(innerIntp.getBatchTableEnviroment(),
+        getInterpreterGroup().getInterpreterHookRegistry(), maxRow);
+    List<String> modifiers = new ArrayList<>();
+    modifiers.add("@transient");
+    this.innerIntp.bind("z", z.getClass().getCanonicalName(), z, modifiers);
   }
 
   @Override
-  public InterpreterResult interpret(String line, InterpreterContext context) {
-    if (line == null || line.trim().length() == 0) {
-      return new InterpreterResult(Code.SUCCESS);
-    }
-
-    InterpreterResult result = interpret(line.split("\n"), context);
-    return result;
+  public void close() throws InterpreterException {
+    this.innerIntp.close();
   }
 
-  public InterpreterResult interpret(String[] lines, InterpreterContext context) {
-    final IMain imain = flinkIloop.intp();
-    
-    String[] linesToRun = new String[lines.length + 1];
-    for (int i = 0; i < lines.length; i++) {
-      linesToRun[i] = lines[i];
-    }
-    linesToRun[lines.length] = "print(\"\")";
-
-    System.setOut(new PrintStream(out));
-    out.reset();
-    Code r = null;
-
-    String incomplete = "";
-    boolean inComment = false;
-
-    for (int l = 0; l < linesToRun.length; l++) {
-      final String s = linesToRun[l];
-      // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
-      if (l + 1 < linesToRun.length) {
-        String nextLine = linesToRun[l + 1].trim();
-        boolean continuation = false;
-        if (nextLine.isEmpty()
-                || nextLine.startsWith("//")         // skip empty line or comment
-                || nextLine.startsWith("}")
-                || nextLine.startsWith("object")) { // include "} object" for Scala companion object
-          continuation = true;
-        } else if (!inComment && nextLine.startsWith("/*")) {
-          inComment = true;
-          continuation = true;
-        } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
-          inComment = false;
-          continuation = true;
-        } else if (nextLine.length() > 1
-                && nextLine.charAt(0) == '.'
-                && nextLine.charAt(1) != '.'     // ".."
-                && nextLine.charAt(1) != '/') {  // "./"
-          continuation = true;
-        } else if (inComment) {
-          continuation = true;
-        }
-        if (continuation) {
-          incomplete += s + "\n";
-          continue;
-        }
-      }
-
-      final String currentCommand = incomplete;
-
-      scala.tools.nsc.interpreter.Results.Result res = null;
-      try {
-        res = Console.withOut(
-          System.out,
-          new AbstractFunction0<Results.Result>() {
-            @Override
-            public Results.Result apply() {
-              return imain.interpret(currentCommand + s);
-            }
-          });
-      } catch (Exception e) {
-        logger.info("Interpreter exception", e);
-        return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
-      }
-
-      r = getResultCode(res);
-
-      if (r == Code.ERROR) {
-        return new InterpreterResult(r, out.toString());
-      } else if (r == Code.INCOMPLETE) {
-        incomplete += s + "\n";
-      } else {
-        incomplete = "";
-      }
-    }
-
-    if (r == Code.INCOMPLETE) {
-      return new InterpreterResult(r, "Incomplete expression");
-    } else {
-      return new InterpreterResult(r, out.toString());
-    }
-  }
-
-  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
-    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
-      return Code.SUCCESS;
-    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
-      return Code.INCOMPLETE;
-    } else {
-      return Code.ERROR;
-    }
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context)
+      throws InterpreterException {
+    this.z.setInterpreterContext(context);
+    this.z.setGui(context.getGui());
+    this.z.setNoteGui(context.getNoteGui());
+    return innerIntp.interpret(st, context);
   }
 
   @Override
-  public void cancel(InterpreterContext context) {
-    if (localMode()) {
-      // In localMode we can cancel all running jobs,
-      // because the local cluster can only run one job at the time.
-      for (JobID job : this.localFlinkCluster.getCurrentlyRunningJobsJava()) {
-        logger.info("Stop job: " + job);
-        cancelJobLocalMode(job);
-      }
-    }
-  }
+  public void cancel(InterpreterContext context) throws InterpreterException {
 
-  private void cancelJobLocalMode(JobID jobID){
-    FiniteDuration timeout = AkkaUtils.getTimeout(this.localFlinkCluster.configuration());
-    ActorGateway leader = this.localFlinkCluster.getLeaderGateway(timeout);
-    leader.ask(new JobManagerMessages.CancelJob(jobID), timeout);
   }
 
   @Override
-  public FormType getFormType() {
+  public FormType getFormType() throws InterpreterException {
     return FormType.NATIVE;
   }
 
   @Override
-  public int getProgress(InterpreterContext context) {
+  public int getProgress(InterpreterContext context) throws InterpreterException {
     return 0;
   }
 
   @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return new LinkedList<>();
+  public List<InterpreterCompletion> completion(String buf,
+                                                int cursor,
+                                                InterpreterContext interpreterContext)
+      throws InterpreterException {
+    return innerIntp.completion(buf, cursor, interpreterContext);
   }
 
-  private void startFlinkMiniCluster() {
-    localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false);
-
-    try {
-      localFlinkCluster.start(true);
-    } catch (Exception e){
-      throw new RuntimeException("Could not start Flink mini cluster.", e);
-    }
+  FlinkScalaInterpreter getInnerScalaInterpreter() {
+    return this.innerIntp;
   }
 
-  private void stopFlinkMiniCluster() {
-    if (localFlinkCluster != null) {
-      localFlinkCluster.stop();
-      localFlinkCluster = null;
-    }
+  ExecutionEnvironment getExecutionEnviroment() {
+    return this.innerIntp.getExecutionEnviroment();
   }
 
-  static final String toString(Object o) {
-    return (o instanceof String) ? (String) o : "";
+  FlinkZeppelinContext getZeppelinContext() {
+    return this.z;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java
new file mode 100644
index 0000000..1ac3547
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSQLInterpreter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+
+import java.util.Properties;
+
+public class FlinkSQLInterpreter extends Interpreter {
+
+  private FlinkSQLScalaInterpreter sqlScalaInterpreter;
+
+  public FlinkSQLInterpreter(Properties properties) {
+    super(properties);
+  }
+
+
+  @Override
+  public void open() throws InterpreterException {
+    FlinkInterpreter flinkInterpreter =
+        getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+    FlinkZeppelinContext z = flinkInterpreter.getZeppelinContext();
+    int maxRow = Integer.parseInt(getProperty("zeppelin.flink.maxResult", "1000"));
+    this.sqlScalaInterpreter = new FlinkSQLScalaInterpreter(
+        flinkInterpreter.getInnerScalaInterpreter(), z, maxRow);
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context)
+      throws InterpreterException {
+    return sqlScalaInterpreter.interpret(st, context);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/flink/src/main/resources/interpreter-setting.json b/flink/src/main/resources/interpreter-setting.json
index f1a04bf..1463e3d 100644
--- a/flink/src/main/resources/interpreter-setting.json
+++ b/flink/src/main/resources/interpreter-setting.json
@@ -23,5 +23,24 @@
       "language": "scala",
       "editOnDblClick": false
     }
+  },
+
+  {
+    "group": "flink",
+    "name": "sql",
+    "className": "org.apache.zeppelin.flink.FlinkSQLInterpreter",
+    "properties": {
+      "zeppelin.flink.maxResult": {
+        "envName": "zeppelin.flink.maxResult",
+        "propertyName": "zeppelin.flink.maxResult",
+        "defaultValue": "1000",
+        "description": "max number of row returned by sql interpreter.",
+        "type": "number"
+      }
+    },
+    "editor": {
+      "language": "sql",
+      "editOnDblClick": false
+    }
   }
 ]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
new file mode 100644
index 0000000..1694a44
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+
+class FlinkSQLScalaInterpreter(scalaInterpreter: FlinkScalaInterpreter,
+                               z: FlinkZeppelinContext,
+                               maxRow: Int) {
+
+  private var btenv: BatchTableEnvironment = scalaInterpreter.getBatchTableEnviroment()
+
+  def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+    try {
+      val table: Table = this.btenv.sql(code)
+      val result = z.showData(table)
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, result)
+    } catch {
+      case e: Exception =>
+        return new InterpreterResult(InterpreterResult.Code.ERROR,
+          "Fail to fetch result: " + e.getMessage)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
new file mode 100644
index 0000000..0653c2a
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import java.io.BufferedReader
+import java.nio.file.Files
+import java.util.Properties
+
+import org.apache.flink.api.scala.FlinkShell._
+import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop}
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.configuration.GlobalConfiguration
+import org.apache.flink.runtime.minicluster.{MiniCluster, StandaloneMiniCluster}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.Completion.ScalaCompleter
+import scala.tools.nsc.interpreter.{JPrintWriter, SimpleReader}
+
+class FlinkScalaInterpreter(val properties: Properties) {
+
+  lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+
+  private var flinkILoop: FlinkILoop = _
+  private var cluster: Option[Either[Either[StandaloneMiniCluster, MiniCluster],
+    ClusterClient[_]]] = _
+  private var scalaCompleter: ScalaCompleter = _
+  private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+
+  private var benv: ExecutionEnvironment = _
+  private var senv: StreamExecutionEnvironment = _
+  private var btenv: BatchTableEnvironment = _
+  private var stenv: StreamTableEnvironment = _
+  private var z: FlinkZeppelinContext = _
+
+  def open(): Unit = {
+    var config = Config(executionMode = ExecutionMode.withName(
+      properties.getProperty("flink.execution.mode", "LOCAL").toUpperCase))
+    val containerNum = Integer.parseInt(properties.getProperty("flink.yarn.num_container", "1"))
+    config = config.copy(yarnConfig =
+      Some(ensureYarnConfig(config).copy(containers = Some(containerNum))))
+    val configuration = GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR"))
+    val replOut = new JPrintWriter(interpreterOutput, true)
+
+    val (iLoop, cluster) = try {
+      val (host, port, cluster) = fetchConnectionInfo(configuration, config)
+      val conf = cluster match {
+        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
+        case Some(Left(Right(_))) => configuration
+        case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
+        case None => configuration
+      }
+      LOGGER.info(s"\nConnecting to Flink cluster (host: $host, port: $port).\n")
+      val repl = new FlinkILoop(host, port, conf, config.externalJars, None, replOut)
+
+      (repl, cluster)
+    } catch {
+      case e: IllegalArgumentException =>
+        println(s"Error: ${e.getMessage}")
+        sys.exit()
+    }
+
+    this.flinkILoop = iLoop
+    this.cluster = cluster
+    val settings = new Settings()
+    settings.usejavacp.value = true
+    settings.Yreplsync.value = true
+
+    val outputDir = Files.createTempDirectory("flink-repl");
+    val interpArguments = List(
+      "-Yrepl-class-based",
+      "-Yrepl-outdir", s"${outputDir.toFile.getAbsolutePath}"
+    )
+    settings.processArguments(interpArguments, true)
+
+    flinkILoop.settings = settings
+    flinkILoop.createInterpreter()
+
+    val in0 = getField(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$in0")
+      .asInstanceOf[Option[BufferedReader]]
+    val reader = in0.fold(flinkILoop.chooseReader(settings))(r =>
+      SimpleReader(r, replOut, interactive = true))
+
+    flinkILoop.in = reader
+    flinkILoop.initializeSynchronous()
+    callMethod(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$loopPostInit")
+    this.scalaCompleter = reader.completion.completer()
+
+    this.benv = flinkILoop.scalaBenv
+    this.senv = flinkILoop.scalaSenv
+    this.btenv = TableEnvironment.getTableEnvironment(this.benv)
+    this.stenv = TableEnvironment.getTableEnvironment(this.senv)
+    bind("btenv", btenv.getClass.getCanonicalName, btenv, List("@transient"))
+    bind("stenv", stenv.getClass.getCanonicalName, stenv, List("@transient"))
+
+    if (java.lang.Boolean.parseBoolean(
+      properties.getProperty("zeppelin.flink.disableSysoutLogging", "true"))) {
+      this.benv.getConfig.disableSysoutLogging()
+      this.senv.getConfig.disableSysoutLogging()
+    }
+  }
+
+  // for use in java side
+  protected def bind(name: String,
+                     tpe: String,
+                     value: Object,
+                     modifier: java.util.List[String]): Unit = {
+    flinkILoop.beQuietDuring {
+      flinkILoop.bind(name, tpe, value, modifier.asScala.toList)
+    }
+  }
+
+  protected def bind(name: String,
+                     tpe: String,
+                     value: Object,
+                     modifier: List[String]): Unit = {
+    flinkILoop.beQuietDuring {
+      flinkILoop.bind(name, tpe, value, modifier)
+    }
+  }
+
+  protected def completion(buf: String,
+                           cursor: Int,
+                           context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    val completions = scalaCompleter.complete(buf, cursor).candidates
+      .map(e => new InterpreterCompletion(e, e, null))
+    scala.collection.JavaConversions.seqAsJavaList(completions)
+  }
+
+  protected def callMethod(obj: Object, name: String): Object = {
+    callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object])
+  }
+
+  protected def callMethod(obj: Object, name: String,
+                           parameterTypes: Array[Class[_]],
+                           parameters: Array[Object]): Object = {
+    val method = obj.getClass.getMethod(name, parameterTypes: _ *)
+    method.setAccessible(true)
+    method.invoke(obj, parameters: _ *)
+  }
+
+
+  protected def getField(obj: Object, name: String): Object = {
+    val field = obj.getClass.getField(name)
+    field.setAccessible(true)
+    field.get(obj)
+  }
+
+  def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+
+    val originalOut = System.out
+
+    def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = {
+      Console.withOut(interpreterOutput) {
+        System.setOut(Console.out)
+        interpreterOutput.setInterpreterOutput(context.out)
+        interpreterOutput.ignoreLeadingNewLinesFromScalaReporter()
+        context.out.clear()
+
+        val status = flinkILoop.interpret(code) match {
+          case scala.tools.nsc.interpreter.IR.Success =>
+            scala.tools.nsc.interpreter.IR.Success
+          case scala.tools.nsc.interpreter.IR.Error =>
+            scala.tools.nsc.interpreter.IR.Error
+          case scala.tools.nsc.interpreter.IR.Incomplete =>
+            // add print("") at the end in case the last line is comment which lead to INCOMPLETE
+            flinkILoop.interpret(code + "\nprint(\"\")")
+        }
+        context.out.flush()
+        status
+      }
+    }
+    // reset the java stdout
+    System.setOut(originalOut)
+
+    val lastStatus = _interpret(code) match {
+      case scala.tools.nsc.interpreter.IR.Success =>
+        InterpreterResult.Code.SUCCESS
+      case scala.tools.nsc.interpreter.IR.Error =>
+        InterpreterResult.Code.ERROR
+      case scala.tools.nsc.interpreter.IR.Incomplete =>
+        InterpreterResult.Code.INCOMPLETE
+    }
+    new InterpreterResult(lastStatus)
+  }
+
+  def close(): Unit = {
+    if (flinkILoop != null) {
+      flinkILoop.close()
+    }
+    if (cluster != null) {
+      cluster match {
+        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
+        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
+        case Some(Right(yarnCluster)) => yarnCluster.shutdown()
+        case _ =>
+      }
+    }
+  }
+
+  def getExecutionEnviroment(): ExecutionEnvironment = this.benv
+
+  def getStreamingExecutionEnviroment(): StreamExecutionEnvironment = this.senv
+
+  def getBatchTableEnviroment(): BatchTableEnvironment = this.btenv
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
new file mode 100644
index 0000000..5246445
--- /dev/null
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+import java.util
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.flink.types.Row
+import org.apache.zeppelin.annotation.ZeppelinApi
+import org.apache.zeppelin.display.AngularObjectWatcher
+import org.apache.zeppelin.display.ui.OptionInput.ParamOption
+import org.apache.zeppelin.interpreter.{BaseZeppelinContext, InterpreterContext,
+  InterpreterHookRegistry}
+
+import scala.collection.{JavaConversions, Seq}
+
+
+/**
+  * ZeppelinContext for Flink
+  */
+class FlinkZeppelinContext(val btenv: BatchTableEnvironment,
+                           val hooks2: InterpreterHookRegistry,
+                           val maxResult2: Int) extends BaseZeppelinContext(hooks2, maxResult2) {
+
+  private val interpreterClassMap = Map(
+    "flink" -> "org.apache.zeppelin.flink.FlinkInterpreter",
+    "sql" -> "org.apache.zeppelin.flink.FlinkSqlInterpreter"
+  )
+
+  private val supportedClasses = Seq(classOf[DataSet[_]])
+
+  override def getSupportedClasses: util.List[Class[_]] =
+    JavaConversions.seqAsJavaList(supportedClasses)
+
+  override def getInterpreterClassMap: util.Map[String, String] =
+    JavaConversions.mapAsJavaMap(interpreterClassMap)
+
+  override def showData(obj: Any): String = {
+    def showTable(table: Table): String = {
+      val columnNames: Array[String] = table.getSchema.getColumnNames
+      val dsRow: DataSet[Row] = btenv.toDataSet[Row](table)
+      val builder: StringBuilder = new StringBuilder("%table ")
+      builder.append(columnNames.mkString("\t"))
+      builder.append("\n")
+      val rows = dsRow.first(maxResult).collect()
+      for (row <- rows) {
+        var i = 0;
+        while (i < row.getArity) {
+          builder.append(row.getField(i))
+          i += 1
+          if (i != row.getArity) {
+            builder.append("\t");
+          }
+        }
+        builder.append("\n")
+      }
+      // append %text at the end, otherwise the following output will be put in table as well.
+      builder.append("\n%text ")
+      builder.toString()
+    }
+
+    if (obj.isInstanceOf[DataSet[_]]) {
+      val ds = obj.asInstanceOf[DataSet[_]]
+      val table = btenv.fromDataSet(ds)
+      showTable(table)
+    } else if (obj.isInstanceOf[Table]) {
+      showTable(obj.asInstanceOf[Table])
+    } else {
+      obj.toString
+    }
+  }
+
+
+  @ZeppelinApi
+  def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)
+
+  @ZeppelinApi
+  def select(name: String, defaultValue: Any, options: Seq[(Any, String)]): Any =
+    select(name, defaultValue, options.map(e => new ParamOption(e._1, e._2)).toArray)
+
+  @ZeppelinApi
+  def checkbox(name: String, options: Seq[(AnyRef, String)]): Seq[Any] = {
+    val javaResult = checkbox(name, JavaConversions.seqAsJavaList(options.map(e => e._1)),
+      options.map(e => new ParamOption(e._1, e._2)).toArray)
+    JavaConversions.asScalaBuffer(javaResult)
+  }
+
+  @ZeppelinApi
+  def checkbox(name: String, defaultChecked: Seq[AnyRef], options: Seq[(Any, String)]): Seq[Any] = {
+    val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked)
+    val javaResult = checkbox(name, defaultCheckedList,
+      options.map(e => new ParamOption(e._1, e._2)).toArray)
+    JavaConversions.asScalaBuffer(javaResult)
+  }
+
+  @ZeppelinApi
+  def noteSelect(name: String, options: Seq[(Any, String)]): Any = noteSelect(name, "", options)
+
+  @ZeppelinApi
+  def noteSelect(name: String, defaultValue: Any, options: Seq[(Any, String)]): AnyRef =
+    noteSelect(name, defaultValue, options.map(e => new ParamOption(e._1, e._2)).toArray)
+
+  @ZeppelinApi
+  def noteCheckbox(name: String, options: Seq[(AnyRef, String)]): Seq[AnyRef] = {
+    val javaResulst = noteCheckbox(name, JavaConversions.seqAsJavaList(options.map(e => e._1)),
+      options.map(e => new ParamOption(e._1, e._2)).toArray)
+    JavaConversions.asScalaBuffer(javaResulst)
+  }
+
+  @ZeppelinApi
+  def noteCheckbox(name: String,
+                   defaultChecked: Seq[AnyRef],
+                   options: Seq[(AnyRef, String)]): Seq[AnyRef] = {
+    val defaultCheckedList = JavaConversions.seqAsJavaList(defaultChecked)
+    val javaResult = noteCheckbox(name, defaultCheckedList,
+      options.map(e => new ParamOption(e._1, e._2)).toArray)
+    JavaConversions.asScalaBuffer(javaResult)
+  }
+
+  @ZeppelinApi def angularWatch(name: String, func: (AnyRef, AnyRef) => Unit): Unit = {
+    angularWatch(name, interpreterContext.getNoteId, func)
+  }
+
+  @deprecated def angularWatchGlobal(name: String, func: (AnyRef, AnyRef) => Unit): Unit = {
+    angularWatch(name, null, func)
+  }
+
+  @ZeppelinApi def angularWatch(name: String,
+                                func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = {
+    angularWatch(name, interpreterContext.getNoteId, func)
+  }
+
+  @deprecated def angularWatchGlobal(name: String,
+                                     func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = {
+    angularWatch(name, null, func)
+  }
+
+  private def angularWatch(name: String, noteId: String, func: (AnyRef, AnyRef) => Unit): Unit = {
+    val w = new AngularObjectWatcher(getInterpreterContext) {
+      override def watch(oldObject: Any, newObject: AnyRef, context: InterpreterContext): Unit = {
+        func(newObject, newObject)
+      }
+    }
+    angularWatch(name, noteId, w)
+  }
+
+  private def angularWatch(name: String, noteId: String,
+                           func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = {
+    val w = new AngularObjectWatcher(getInterpreterContext) {
+      override def watch(oldObject: AnyRef,
+                         newObject: AnyRef,
+                         context: InterpreterContext): Unit = {
+        func(oldObject, newObject, context)
+      }
+    }
+    angularWatch(name, noteId, w)
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 128f567..0c42139 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,84 +16,240 @@
  */
 package org.apache.zeppelin.flink;
 
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.ui.CheckBox;
+import org.apache.zeppelin.display.ui.Select;
+import org.apache.zeppelin.display.ui.TextBox;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class FlinkInterpreterTest {
 
-  private static FlinkInterpreter flink;
-  private static InterpreterContext context;
+  private FlinkInterpreter interpreter;
+  private InterpreterContext context;
+
+  // catch the streaming output in onAppend
+  private volatile String output = "";
+  // catch the interpreter output in onUpdate
+  private List<InterpreterResultMessageOutput> messageOutput;
 
-  @BeforeClass
-  public static void setUp() {
+  @Before
+  public void setUp() throws InterpreterException {
     Properties p = new Properties();
-    flink = new FlinkInterpreter(p);
-    flink.open();
+    interpreter = new FlinkInterpreter(p);
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    interpreter.setInterpreterGroup(intpGroup);
+    interpreter.open();
     context = InterpreterContext.builder().build();
+    InterpreterContext.set(context);
   }
 
-  @AfterClass
-  public static void tearDown() {
-    flink.close();
-  }
-
-  @Test
-  public void testNextLineInvocation() {
-    assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret("\"123\"\n.toInt", context)
-            .code());
-  }
-
-  @Test
-  public void testNextLineComments() {
-    assertEquals(InterpreterResult.Code.SUCCESS,
-            flink.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
+  @After
+  public void tearDown() throws InterpreterException {
+    interpreter.close();
   }
 
   @Test
-  public void testNextLineCompanionObject() {
-    String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter " +
-            "{\n def apply(x: Long) = new Counter()\n}";
-    assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret(code, context).code());
+  public void testBasicScala() throws InterpreterException, IOException {
+    InterpreterResult result = interpreter.interpret("val a=\"hello world\"",
+        getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("a: String = hello world\n", output);
+
+    result = interpreter.interpret("print(a)", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("hello world", output);
+
+    // java stdout
+    result = interpreter.interpret("System.out.print(a)", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("hello world", output);
+
+    // incomplete
+    result = interpreter.interpret("println(a", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.INCOMPLETE, result.code());
+
+    // syntax error
+    result = interpreter.interpret("println(b)", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(output.contains("not found: value b"));
+
+    // multiple line
+    result = interpreter.interpret("\"123\".\ntoInt", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // single line comment
+    result = interpreter.interpret("/*comment here*/", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")",
+        getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // multiple line comment
+    result = interpreter.interpret("/*line 1 \n line 2*/",
+        getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // test function
+    result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }",
+        getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = interpreter.interpret("print(add(1,2))", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")",
+        getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // companion object
+    result = interpreter.interpret("class Counter {\n " +
+        "var value: Long = 0} \n" +
+        "object Counter {\n def apply(x: Long) = new Counter()\n}", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // case class
+    result = interpreter.interpret(
+        "case class Bank(age:Integer, job:String, marital : String, education : String," +
+            " balance : Integer)\n",
+        getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // ZeppelinContext
+    context = getInterpreterContext();
+    result = interpreter.interpret("val ds = benv.fromElements(1,2,3)\nz.show(ds)", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, messageOutput.get(0).getType());
+    assertEquals("f0\n" +
+        "1\n" +
+        "2\n" +
+        "3\n", messageOutput.get(0).toInterpreterResultMessage().getData());
+
+    context = getInterpreterContext();
+    result = interpreter.interpret("z.input(\"name\", \"default_name\")",
+        context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, context.getGui().getForms().size());
+    assertTrue(context.getGui().getForms().get("name") instanceof TextBox);
+    TextBox textBox = (TextBox) context.getGui().getForms().get("name");
+    assertEquals("name", textBox.getName());
+    assertEquals("default_name", textBox.getDefaultValue());
+
+    context = getInterpreterContext();
+    result = interpreter.interpret("z.checkbox(\"checkbox_1\", " +
+        "Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, context.getGui().getForms().size());
+    assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox);
+    CheckBox checkBox = (CheckBox) context.getGui().getForms().get("checkbox_1");
+    assertEquals("checkbox_1", checkBox.getName());
+    assertEquals(1, checkBox.getDefaultValue().length);
+    assertEquals("value_2", checkBox.getDefaultValue()[0]);
+    assertEquals(2, checkBox.getOptions().length);
+    assertEquals("value_1", checkBox.getOptions()[0].getValue());
+    assertEquals("name_1", checkBox.getOptions()[0].getDisplayName());
+    assertEquals("value_2", checkBox.getOptions()[1].getValue());
+    assertEquals("name_2", checkBox.getOptions()[1].getDisplayName());
+
+    context = getInterpreterContext();
+    result = interpreter.interpret("z.select(\"select_1\", Seq(\"value_2\"), " +
+        "Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, context.getGui().getForms().size());
+    assertTrue(context.getGui().getForms().get("select_1") instanceof Select);
+    Select select = (Select) context.getGui().getForms().get("select_1");
+    assertEquals("select_1", select.getName());
+    // TODO(zjffdu) it seems a bug of GUI, the default value should be 'value_2',
+    // but it is List(value_2)
+    // assertEquals("value_2", select.getDefaultValue());
+    assertEquals(2, select.getOptions().length);
+    assertEquals("value_1", select.getOptions()[0].getValue());
+    assertEquals("name_1", select.getOptions()[0].getDisplayName());
+    assertEquals("value_2", select.getOptions()[1].getValue());
+    assertEquals("name_2", select.getOptions()[1].getDisplayName());
   }
 
   @Test
-  public void testSimpleStatement() {
-    InterpreterResult result = flink.interpret("val a=1", context);
-    result = flink.interpret("print(a)", context);
-    assertEquals("1", result.message().get(0).getData());
+  public void testCompletion() throws InterpreterException {
+    InterpreterResult result = interpreter.interpret("val a=\"hello world\"",
+        getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("a: String = hello world\n", output);
+
+    List<InterpreterCompletion> completions = interpreter.completion("a.", 2,
+        getInterpreterContext());
+    assertTrue(completions.size() > 0);
   }
 
-  @Test
-  public void testSimpleStatementWithSystemOutput() {
-    InterpreterResult result = flink.interpret("val a=1", context);
-    result = flink.interpret("System.out.print(a)", context);
-    assertEquals("1", result.message().get(0).getData());
-  }
 
+  // Disable it for now as there's extra std output from flink shell.
   @Test
-  public void testWordCount() {
-    flink.interpret("val text = benv.fromElements(\"To be or not to be\")", context);
-    flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }" +
-            ".map { (_, 1) }.groupBy(0).sum(1)", context);
-    InterpreterResult result = flink.interpret("counts.print()", context);
-    assertEquals(Code.SUCCESS, result.code());
+  public void testWordCount() throws InterpreterException, IOException {
+    interpreter.interpret("val text = benv.fromElements(\"To be or not to be\")",
+        getInterpreterContext());
+    interpreter.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }" +
+        ".map { (_, 1) }.groupBy(0).sum(1)", getInterpreterContext());
+    InterpreterResult result = interpreter.interpret("counts.print()", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     String[] expectedCounts = {"(to,2)", "(be,2)", "(or,1)", "(not,1)"};
     Arrays.sort(expectedCounts);
 
-    String[] counts = result.message().get(0).getData().split("\n");
+    String[] counts = output.split("\n");
     Arrays.sort(counts);
 
     assertArrayEquals(expectedCounts, counts);
   }
+
+  private InterpreterContext getInterpreterContext() {
+    output = "";
+    messageOutput = new ArrayList<>();
+    InterpreterContext context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
+        .build();
+    context.out = new InterpreterOutput(
+        new InterpreterOutputListener() {
+          @Override
+          public void onUpdateAll(InterpreterOutput out) {
+
+          }
+
+          @Override
+          public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+            try {
+              output = out.toInterpreterResultMessage().getData();
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+
+          @Override
+          public void onUpdate(int index, InterpreterResultMessageOutput out) {
+            messageOutput.add(out);
+          }
+        });
+    return context;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java
new file mode 100644
index 0000000..6993540
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkSQLInterpreterTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class FlinkSQLInterpreterTest {
+
+  private FlinkInterpreter interpreter;
+  private FlinkSQLInterpreter sqlInterpreter;
+  private InterpreterContext context;
+
+  // catch the streaming output in onAppend
+  private volatile String output = "";
+  // catch the interpreter output in onUpdate
+  private InterpreterResultMessageOutput messageOutput;
+
+  @Before
+  public void setUp() throws InterpreterException {
+    Properties p = new Properties();
+    interpreter = new FlinkInterpreter(p);
+    sqlInterpreter = new FlinkSQLInterpreter(p);
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    interpreter.setInterpreterGroup(intpGroup);
+    sqlInterpreter.setInterpreterGroup(intpGroup);
+    intpGroup.addInterpreterToSession(interpreter, "session_1");
+    intpGroup.addInterpreterToSession(sqlInterpreter, "session_1");
+
+    interpreter.open();
+    sqlInterpreter.open();
+    context = InterpreterContext.builder().build();
+  }
+
+  @Test
+  public void testSQLInterpreter() throws InterpreterException {
+    InterpreterResult result = interpreter.interpret(
+        "val ds = benv.fromElements((1, \"jeff\"), (2, \"andy\"))", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = interpreter.interpret("btenv.registerDataSet(\"table_1\", ds)",
+        getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = sqlInterpreter.interpret("select * from table_1", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+    assertEquals("_1\t_2\n" +
+        "1\tjeff\n" +
+        "2\tandy\n", result.message().get(0).getData());
+  }
+
+  private InterpreterContext getInterpreterContext() {
+    output = "";
+    InterpreterContext context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
+        .build();
+    context.out = new InterpreterOutput(
+        new InterpreterOutputListener() {
+          @Override
+          public void onUpdateAll(InterpreterOutput out) {
+
+          }
+
+          @Override
+          public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+            try {
+              output = out.toInterpreterResultMessage().getData();
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+
+          @Override
+          public void onUpdate(int index, InterpreterResultMessageOutput out) {
+            messageOutput = out;
+          }
+        });
+    return context;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink/src/test/resources/flink-conf.yaml b/flink/src/test/resources/flink-conf.yaml
new file mode 100644
index 0000000..1041d0f
--- /dev/null
+++ b/flink/src/test/resources/flink-conf.yaml
@@ -0,0 +1,247 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
+
+#mode: legacy
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+
+# The heap size for the JobManager JVM
+
+jobmanager.heap.mb: 1024
+
+
+# The heap size for the TaskManager JVM
+
+taskmanager.heap.mb: 1024
+
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+# 
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper stores
+# the small ground truth for checkpoint and leader election, this location stores
+# the larger objects, like persisted dataflow graphs.
+# 
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...) 
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled.
+#
+# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
+# <class-name-of-factory>.
+#
+# state.backend: filesystem
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend). 
+#
+# state.backend.incremental: false
+
+#==============================================================================
+# Web Frontend
+#==============================================================================
+
+# The address under which the web-based runtime monitor listens.
+#
+#jobmanager.web.address: 0.0.0.0
+
+# The port under which the web-based runtime monitor listens.
+# A value of -1 deactivates the web server.
+
+rest.port: 8081
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#jobmanager.web.submit.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn or Mesos, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+#     /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# Specify whether TaskManager's managed memory should be allocated when starting
+# up (true) or when memory is requested.
+#
+# We recommend to set this value to 'true' only in setups for pure batch
+# processing (DataSet API). Streaming setups currently do not use the TaskManager's
+# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
+# while the 'memory' and 'filesystem' backends explicitly keep data as objects
+# to save on serialization cost.
+#
+# taskmanager.memory.preallocate: false
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need 
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, teh default max is 1GB.
+# 
+# taskmanager.network.memory.fraction: 0.1
+# taskmanager.network.memory.min: 67108864
+# taskmanager.network.memory.max: 1073741824
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in "security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+#historyserver.web.port: 8082
+
+# Comma separated list of directories to monitor for completed jobs.
+#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+
+# Interval in milliseconds for refreshing the monitored directories.
+#historyserver.archive.fs.refresh-interval: 10000
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/flink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink/src/test/resources/log4j.properties b/flink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..65b6d36
--- /dev/null
+++ b/flink/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
+
+log4j.logger.org.apache.zeppelin.flink=WARN

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index 10ab354..9c7a0b2 100644
--- a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -94,6 +94,12 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
       if (RemoteInterpreterUtils.isEnvString((String) key)) {
         env.put((String) key, context.getProperties().getProperty((String) key));
       }
+      // TODO(zjffdu) move this to FlinkInterpreterLauncher
+      if (key.toString().equals("FLINK_HOME")) {
+        String flinkHome = context.getProperties().get(key).toString();
+        env.put("FLINK_CONF_DIR", flinkHome + "/conf");
+        env.put("FLINK_LIB_DIR", flinkHome + "/lib");
+      }
     }
     return env;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index 9bc2a4c..4eaedb2 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -368,14 +368,6 @@
       </dependency>
 
       <dependency>
-        <groupId>org.apache.zeppelin</groupId>
-        <artifactId>zeppelin-zengine</artifactId>
-        <version>${project.version}</version>
-        <classifier>tests</classifier>
-        <scope>test</scope>
-      </dependency>
-
-      <dependency>
         <groupId>org.bitbucket.cowwoc</groupId>
         <artifactId>diff-match-patch</artifactId>
         <version>1.1</version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
new file mode 100644
index 0000000..157b989
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
@@ -0,0 +1,136 @@
+package org.apache.zeppelin.interpreter;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for downloading spark. This is used for spark integration test.
+ *
+ */
+public class SparkDownloadUtils {
+  private static Logger LOGGER = LoggerFactory.getLogger(SparkDownloadUtils.class);
+
+  private static String downloadFolder = System.getProperty("user.home") + "/.cache/spark";
+
+  static {
+    try {
+      FileUtils.forceMkdir(new File(downloadFolder));
+    } catch (IOException e) {
+      throw new RuntimeException("Fail to create downloadFolder: " + downloadFolder, e);
+    }
+  }
+
+
+  public static String downloadSpark(String version) {
+    File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6");
+    if (targetSparkHomeFolder.exists()) {
+      LOGGER.info("Skip to download spark as it is already downloaded.");
+      return targetSparkHomeFolder.getAbsolutePath();
+    }
+    // Try mirrors a few times until one succeeds
+    boolean downloaded = false;
+    for (int i = 0; i < 3; i++) {
+      try {
+        String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"));
+        File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz");
+        String downloadURL = preferredMirror + "/spark/spark-" + version + "/spark-" + version + "-bin-hadoop2.6.tgz";
+        runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
+        runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
+        downloaded = true;
+        break;
+      } catch (Exception e) {
+        LOGGER.warn("Failed to download Spark", e);
+      }
+    }
+    // fallback to use apache archive
+    // https://archive.apache.org/dist/spark/spark-1.6.3/spark-1.6.3-bin-hadoop2.6.tgz
+    if (!downloaded) {
+      File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz");
+      String downloadURL =
+          "https://archive.apache.org/dist/spark/spark-"
+              + version
+              + "/spark-"
+              + version
+              + "-bin-hadoop2.6.tgz";
+      try {
+        runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
+        runShellCommand(
+            new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
+      } catch (Exception e) {
+        throw new RuntimeException("Fail to download spark " + version, e);
+      }
+    }
+    return targetSparkHomeFolder.getAbsolutePath();
+  }
+
+  public static String downloadFlink(String version) {
+    File targetFlinkHomeFolder = new File(downloadFolder + "/flink-" + version);
+    if (targetFlinkHomeFolder.exists()) {
+      LOGGER.info("Skip to download flink as it is already downloaded.");
+      return targetFlinkHomeFolder.getAbsolutePath();
+    }
+    // Try mirrors a few times until one succeeds
+    for (int i = 0; i < 3; i++) {
+      try {
+        String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"));
+        File downloadFile = new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz");
+        String downloadURL = preferredMirror + "/flink/flink-" + version + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz";
+        runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder});
+        runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder});
+        break;
+      } catch (Exception e) {
+        LOGGER.warn("Failed to download Flink", e);
+      }
+    }
+    return targetFlinkHomeFolder.getAbsolutePath();
+  }
+
+  private static void runShellCommand(String[] commands) throws IOException, InterruptedException {
+    LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " "));
+    Process process = Runtime.getRuntime().exec(commands);
+    StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream());
+    StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream());
+    errorGobbler.start();
+    outputGobbler.start();
+    if (process.waitFor() != 0) {
+      throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " "));
+    }
+    LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " "));
+  }
+
+  private static class StreamGobbler extends Thread {
+    InputStream is;
+
+    // reads everything from is until empty.
+    StreamGobbler(InputStream is) {
+      this.is = is;
+    }
+
+    public void run() {
+      try {
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+        String line = null;
+        long startTime = System.currentTimeMillis();
+        while ( (line = br.readLine()) != null) {
+          // logging per 5 seconds
+          if ((System.currentTimeMillis() - startTime) > 5000) {
+            LOGGER.info(line);
+            startTime = System.currentTimeMillis();
+          }
+        }
+      } catch (IOException ioe) {
+        ioe.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1c5b38a9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index b64b15b..6f9f81f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -225,7 +225,9 @@ public class RemoteInterpreter extends Interpreter {
                 remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
                 }.getType());
             context.getConfig().clear();
-            context.getConfig().putAll(remoteConfig);
+            if (remoteConfig != null) {
+              context.getConfig().putAll(remoteConfig);
+            }
             GUI currentGUI = context.getGui();
             GUI currentNoteGUI = context.getNoteGui();
             if (form == FormType.NATIVE) {