You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/07 07:39:17 UTC

[zeppelin] branch master updated: [ZEPPELIN-4714]. Flink table api doesn't work in multiple threads

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bd8e28  [ZEPPELIN-4714]. Flink table api doesn't work in multiple threads
7bd8e28 is described below

commit 7bd8e288b79033eded10ecbe075b4c938979bb68
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Apr 2 11:43:38 2020 +0800

    [ZEPPELIN-4714]. Flink table api doesn't work in multiple threads
    
    ### What is this PR for?
    This PR is to fix the issue of FLINK-16936 by a workaround, already creating tableenv before execution scala or python code. Building tablenv is pretty light which won't cost much time. So it is acceptable for this workaround. Another this PR try to fix is the ClassLoader issue for PyFlinkInterpreter. This PR will always set classloader before executing python code so that pyflink api can call udf defined in scala.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4714
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3715 from zjffdu/ZEPPELIN-4714 and squashes the following commits:
    
    55d613576 [Jeff Zhang] [ZEPPELIN-4714]. Flink table api doesn't work in multiple threads
---
 .travis.yml                                        |   1 +
 flink/pom.xml                                      |   5 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java    |   7 ++
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |  46 +++++--
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |  54 ++++++--
 .../org/apache/zeppelin/flink/TableEnvFactory.java |  14 +++
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  21 +++-
 .../flink/FlinkBatchSqlInterpreterTest.java        |   2 +-
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    | 139 ++++++++++++++++-----
 .../zeppelin/flink/PyFlinkInterpreterTest.java     |  17 +--
 flink/src/test/resources/log4j.properties          |   3 +-
 flink/src/test/resources/log4j2.properties         |   2 +-
 .../apache/zeppelin/python/PythonInterpreter.java  |   6 +-
 .../zeppelin/python/IPythonInterpreterTest.java    |   2 +-
 .../zeppelin/jupyter/JupyterKernelInterpreter.java |   2 -
 15 files changed, 246 insertions(+), 75 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 5082aac..d607a44 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -221,3 +221,4 @@ after_failure:
   - cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stdout
   - cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stderr
   - cat zeppelin-zengine/target/org.apache.zeppelin.interpreter.MiniHadoopCluster/*/*/*/stdout
+  - cat flink/*.log
diff --git a/flink/pom.xml b/flink/pom.xml
index 21ec182..e5dca7b 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -637,7 +637,10 @@
           <skip>false</skip>
           <forkCount>1</forkCount>
           <reuseForks>false</reuseForks>
-          <argLine>-Xmx3072m -XX:MaxPermSize=256m </argLine>
+          <!-- set sun.zip.disableMemoryMapping=true because of
+          https://blogs.oracle.com/poonam/crashes-in-zipgetentry
+          https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
+          <argLine>-Xmx3072m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
 
           <environmentVariables>
             <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index b089b36..af4de3d 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -137,6 +137,13 @@ public class FlinkInterpreter extends Interpreter {
     return this.innerIntp.getDefaultSqlParallelism();
   }
 
+  /**
+   * Workaround for issue of FLINK-16936.
+   */
+  public void createPlannerAgain() {
+    this.innerIntp.createPlannerAgain();
+  }
+
   public ClassLoader getFlinkScalaShellLoader() {
     return innerIntp.getFlinkScalaShellLoader();
   }
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 970f6cf..5564a57 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -40,6 +40,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
   private FlinkInterpreter flinkInterpreter;
   private InterpreterContext curInterpreterContext;
   private boolean opened = false;
+  private ClassLoader originalClassLoader;
 
   public IPyFlinkInterpreter(Properties property) {
     super(property);
@@ -78,16 +79,26 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
   public InterpreterResult internalInterpret(String st,
                                              InterpreterContext context)
           throws InterpreterException {
-    // set InterpreterContext in the python thread first, otherwise flink job could not be
-    // associated with paragraph in JobListener
-    this.curInterpreterContext = context;
-    InterpreterResult result =
-            super.internalInterpret("intp.setInterpreterContextInPythonThread()", context);
-    if (result.code() != InterpreterResult.Code.SUCCESS) {
-      throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
-              result.toString());
+    try {
+      // set InterpreterContext in the python thread first, otherwise flink job could not be
+      // associated with paragraph in JobListener
+      this.curInterpreterContext = context;
+      InterpreterResult result =
+              super.internalInterpret("intp.initJavaThread()", context);
+      if (result.code() != InterpreterResult.Code.SUCCESS) {
+        throw new InterpreterException("Fail to initJavaThread: " +
+                result.toString());
+      }
+      return super.internalInterpret(st, context);
+    } finally {
+      if (getKernelProcessLauncher().isRunning()) {
+        InterpreterResult result =
+                super.internalInterpret("intp.resetClassLoaderInPythonThread()", context);
+        if (result.code() != InterpreterResult.Code.SUCCESS) {
+          LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
+        }
+      }
     }
-    return super.internalInterpret(st, context);
   }
 
   @Override
@@ -105,8 +116,23 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
     }
   }
 
-  public void setInterpreterContextInPythonThread() {
+  /**
+   * Called by python process.
+   */
+  public void initJavaThread() {
     InterpreterContext.set(curInterpreterContext);
+    originalClassLoader = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
+    flinkInterpreter.createPlannerAgain();
+  }
+
+  /**
+   * Called by python process.
+   */
+  public void resetClassLoaderInPythonThread() {
+    if (originalClassLoader != null) {
+      Thread.currentThread().setContextClassLoader(originalClassLoader);
+    }
   }
 
   @Override
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 4ce4605..91ec0fe 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -46,6 +46,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
   private FlinkInterpreter flinkInterpreter;
   private InterpreterContext curInterpreterContext;
   private boolean isOpened = false;
+  private ClassLoader originalClassLoader;
 
   public PyFlinkInterpreter(Properties properties) {
     super(properties);
@@ -103,22 +104,53 @@ public class PyFlinkInterpreter extends PythonInterpreter {
 
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException {
-    if (isOpened) {
-      // set InterpreterContext in the python thread first, otherwise flink job could not be
-      // associated with paragraph in JobListener
-      this.curInterpreterContext = context;
-      InterpreterResult result =
-              super.interpret("intp.setInterpreterContextInPythonThread()", context);
-      if (result.code() != InterpreterResult.Code.SUCCESS) {
-        throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
-                result.toString());
+    try {
+      if (isOpened) {
+        // set InterpreterContext in the python thread first, otherwise flink job could not be
+        // associated with paragraph in JobListener
+        this.curInterpreterContext = context;
+        InterpreterResult result =
+                super.interpret("intp.initJavaThread()", context);
+        if (result.code() != InterpreterResult.Code.SUCCESS) {
+          throw new InterpreterException("Fail to initJavaThread: " +
+                  result.toString());
+        }
+      }
+      flinkInterpreter.createPlannerAgain();
+      return super.interpret(st, context);
+    } finally {
+      if (getPythonProcessLauncher().isRunning()) {
+        InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context);
+        if (result.code() != InterpreterResult.Code.SUCCESS) {
+          LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
+        }
       }
     }
-    return super.interpret(st, context);
   }
 
-  public void setInterpreterContextInPythonThread() {
+  /**
+   * Called by python process.
+   */
+  public void initJavaThread() {
     InterpreterContext.set(curInterpreterContext);
+    originalClassLoader = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
+    flinkInterpreter.createPlannerAgain();
+  }
+
+  /**
+   * Called by python process.
+   */
+  public void resetClassLoaderInPythonThread() {
+    if (originalClassLoader != null) {
+      Thread.currentThread().setContextClassLoader(originalClassLoader);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    super.cancel(context);
+    flinkInterpreter.cancel(context);
   }
 
   @Override
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 5ab551e..6720bf2 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -182,6 +182,20 @@ public class TableEnvFactory {
             settings.isStreamingMode());
   }
 
+  public void createPlanner(EnvironmentSettings settings) {
+    Map<String, String> executorProperties = settings.toExecutorProperties();
+    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+
+    Map<String, String> plannerProperties = settings.toPlannerProperties();
+    ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+            .create(
+                    plannerProperties,
+                    executor,
+                    tblConfig,
+                    blinkFunctionCatalog,
+                    catalogManager);
+  }
+
   public StreamTableEnvironment createJavaBlinkStreamTableEnvironment(
           EnvironmentSettings settings) {
 
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 4e6f3d0..32962da 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -79,6 +79,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   private var mode: ExecutionMode.Value = _
 
+  private var tblEnvFactory: TableEnvFactory = _
   private var benv: ExecutionEnvironment = _
   private var senv: StreamExecutionEnvironment = _
 
@@ -229,7 +230,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
         config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
       val classLoader = Thread.currentThread().getContextClassLoader
       try {
-        // use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could find
+        // use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could not find
         // the TableFactory properly
         Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
         val repl = new FlinkILoop(configuration, config.externalJars, None, replOut)
@@ -299,7 +300,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
       val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
       val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
 
-      val tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
+      this.tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
         catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog)
 
       // blink planner
@@ -547,7 +548,23 @@ class FlinkScalaInterpreter(val properties: Properties) {
     field.get(obj)
   }
 
+  /**
+   * This is just a workaround to make table api work in multiple threads.
+   */
+  def createPlannerAgain(): Unit = {
+    val originalClassLoader = Thread.currentThread().getContextClassLoader
+    try {
+      Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
+      val stEnvSetting =
+        EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
+      this.tblEnvFactory.createPlanner(stEnvSetting)
+    } finally {
+      Thread.currentThread().setContextClassLoader(originalClassLoader)
+    }
+  }
+
   def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+    createPlannerAgain()
     val originalStdOut = System.out
     val originalStdErr = System.err;
     if (context != null) {
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 651645b..fb51d57 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -84,7 +84,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     // select which use scala udf
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("SELECT addOne(id) as add_one FROM source_table", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(1, resultMessages.size());
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index fda41ad..eb678a2 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -33,6 +33,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Properties;
 
@@ -45,6 +46,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
 
   private RemoteInterpreterEventClient mockIntpEventClient =
           mock(RemoteInterpreterEventClient.class);
+  private LazyOpenInterpreter flinkScalaInterpreter;
 
   protected Properties initIntpProperties() {
     Properties p = new Properties();
@@ -62,12 +64,12 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     context.setIntpEventClient(mockIntpEventClient);
     InterpreterContext.set(context);
 
-    LazyOpenInterpreter flinkInterpreter = new LazyOpenInterpreter(
+    this.flinkScalaInterpreter = new LazyOpenInterpreter(
         new FlinkInterpreter(properties));
     intpGroup = new InterpreterGroup();
     intpGroup.put("session_1", new ArrayList<Interpreter>());
-    intpGroup.get("session_1").add(flinkInterpreter);
-    flinkInterpreter.setInterpreterGroup(intpGroup);
+    intpGroup.get("session_1").add(flinkScalaInterpreter);
+    flinkScalaInterpreter.setInterpreterGroup(intpGroup);
 
     LazyOpenInterpreter pyFlinkInterpreter =
         new LazyOpenInterpreter(new PyFlinkInterpreter(properties));
@@ -94,17 +96,17 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
 
   @Test
   public void testBatchIPyFlink() throws InterpreterException {
-    testBatchPyFlink(interpreter);
+    testBatchPyFlink(interpreter, flinkScalaInterpreter);
   }
 
   @Test
-  public void testStreamIPyFlink() throws InterpreterException {
-    testStreamPyFlink(interpreter);
+  public void testStreamIPyFlink() throws InterpreterException, IOException {
+    testStreamPyFlink(interpreter, flinkScalaInterpreter);
   }
 
-  public static void testBatchPyFlink(Interpreter interpreter) throws InterpreterException {
+  public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException {
     InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
-    InterpreterResult result = interpreter.interpret(
+    InterpreterResult result = pyflinkInterpreter.interpret(
         "import tempfile\n" +
         "import os\n" +
         "import shutil\n" +
@@ -131,6 +133,77 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
         "bt_env.execute(\"batch_job\")"
             , context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // use group by
+    context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    result = pyflinkInterpreter.interpret(
+            "import tempfile\n" +
+            "import os\n" +
+            "import shutil\n" +
+            "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+            "if os.path.exists(sink_path):\n" +
+            "    if os.path.isfile(sink_path):\n" +
+            "      os.remove(sink_path)\n" +
+            "    else:\n" +
+            "      shutil.rmtree(sink_path)\n" +
+            "b_env.set_parallelism(1)\n" +
+            "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+            "bt_env.connect(FileSystem().path(sink_path)) \\\n" +
+            "    .with_format(OldCsv()\n" +
+            "      .field_delimiter(',')\n" +
+            "      .field(\"a\", DataTypes.STRING())\n" +
+            "      .field(\"b\", DataTypes.BIGINT())\n" +
+            "      .field(\"c\", DataTypes.BIGINT())) \\\n" +
+            "    .with_schema(Schema()\n" +
+            "      .field(\"a\", DataTypes.STRING())\n" +
+            "      .field(\"b\", DataTypes.BIGINT())\n" +
+            "      .field(\"c\", DataTypes.BIGINT())) \\\n" +
+            "    .register_table_sink(\"batch_sink4\")\n" +
+            "t.group_by(\"c\").select(\"c, sum(a), count(b)\").insert_into(\"batch_sink4\")\n" +
+            "bt_env.execute(\"batch_job4\")"
+            , context);
+    assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
+
+    // use scala udf in pyflink
+    // define scala udf
+    result = flinkScalaInterpreter.interpret(
+            "class AddOne extends ScalarFunction {\n" +
+                    "  def eval(a: java.lang.Long): String = a + \"\1\"\n" +
+                    "}", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = flinkScalaInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
+            context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    result = pyflinkInterpreter.interpret(
+            "import tempfile\n" +
+            "import os\n" +
+            "import shutil\n" +
+            "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+            "if os.path.exists(sink_path):\n" +
+            "    if os.path.isfile(sink_path):\n" +
+            "      os.remove(sink_path)\n" +
+            "    else:\n" +
+            "      shutil.rmtree(sink_path)\n" +
+            "b_env.set_parallelism(1)\n" +
+            "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+            "bt_env.connect(FileSystem().path(sink_path)) \\\n" +
+            "    .with_format(OldCsv()\n" +
+            "      .field_delimiter(',')\n" +
+            "      .field(\"a\", DataTypes.BIGINT())\n" +
+            "      .field(\"b\", DataTypes.STRING())\n" +
+            "      .field(\"c\", DataTypes.STRING())) \\\n" +
+            "    .with_schema(Schema()\n" +
+            "      .field(\"a\", DataTypes.BIGINT())\n" +
+            "      .field(\"b\", DataTypes.STRING())\n" +
+            "      .field(\"c\", DataTypes.STRING())) \\\n" +
+            "    .register_table_sink(\"batch_sink3\")\n" +
+            "t.select(\"a, addOne(a), c\").insert_into(\"batch_sink3\")\n" +
+            "bt_env.execute(\"batch_job3\")"
+            , context);
+    assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
   }
 
   @Override
@@ -149,33 +222,33 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     }
   }
 
-  public static void testStreamPyFlink(Interpreter interpreter) throws InterpreterException {
+  public static void testStreamPyFlink(Interpreter interpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
     InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
     InterpreterResult result = interpreter.interpret(
-          "import tempfile\n" +
-          "import os\n" +
-          "import shutil\n" +
-          "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
-          "if os.path.exists(sink_path):\n" +
-          "    if os.path.isfile(sink_path):\n" +
-          "      os.remove(sink_path)\n" +
-          "    else:\n" +
-          "      shutil.rmtree(sink_path)\n" +
-          "s_env.set_parallelism(1)\n" +
-          "t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
-          "st_env.connect(FileSystem().path(sink_path)) \\\n" +
-          "    .with_format(OldCsv()\n" +
-          "      .field_delimiter(',')\n" +
-          "      .field(\"a\", DataTypes.BIGINT())\n" +
-          "      .field(\"b\", DataTypes.STRING())\n" +
-          "      .field(\"c\", DataTypes.STRING())) \\\n" +
-          "    .with_schema(Schema()\n" +
-          "      .field(\"a\", DataTypes.BIGINT())\n" +
-          "      .field(\"b\", DataTypes.STRING())\n" +
-          "      .field(\"c\", DataTypes.STRING())) \\\n" +
-          "    .register_table_sink(\"stream_sink\")\n" +
-          "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
-          "st_env.execute(\"stream_job\")"
+            "import tempfile\n" +
+            "import os\n" +
+            "import shutil\n" +
+            "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+            "if os.path.exists(sink_path):\n" +
+            "    if os.path.isfile(sink_path):\n" +
+            "      os.remove(sink_path)\n" +
+            "    else:\n" +
+            "      shutil.rmtree(sink_path)\n" +
+            "s_env.set_parallelism(1)\n" +
+            "t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+            "st_env.connect(FileSystem().path(sink_path)) \\\n" +
+            "    .with_format(OldCsv()\n" +
+            "      .field_delimiter(',')\n" +
+            "      .field(\"a\", DataTypes.BIGINT())\n" +
+            "      .field(\"b\", DataTypes.STRING())\n" +
+            "      .field(\"c\", DataTypes.STRING())) \\\n" +
+            "    .with_schema(Schema()\n" +
+            "      .field(\"a\", DataTypes.BIGINT())\n" +
+            "      .field(\"b\", DataTypes.STRING())\n" +
+            "      .field(\"c\", DataTypes.STRING())) \\\n" +
+            "    .register_table_sink(\"stream_sink\")\n" +
+            "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
+            "st_env.execute(\"stream_job\")"
             , context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
   }
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
index a42d594..7bbc1dd 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.flink;
 
 
 import com.google.common.io.Files;
-import org.apache.commons.io.IOUtils;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -32,6 +31,8 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.python.PythonInterpreterTest;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -48,7 +49,7 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
   private RemoteInterpreterEventClient mockRemoteEventClient =
           mock(RemoteInterpreterEventClient.class);
 
-  private Interpreter flinkInterpreter;
+  private Interpreter flinkScalaInterpreter;
   private Interpreter streamSqlInterpreter;
   private Interpreter batchSqlInterpreter;
 
@@ -77,9 +78,9 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
         .setIntpEventClient(mockRemoteEventClient)
         .build();
     InterpreterContext.set(context);
-    flinkInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
-    intpGroup.get("session_1").add(flinkInterpreter);
-    flinkInterpreter.setInterpreterGroup(intpGroup);
+    flinkScalaInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
+    intpGroup.get("session_1").add(flinkScalaInterpreter);
+    flinkScalaInterpreter.setInterpreterGroup(intpGroup);
 
     LazyOpenInterpreter iPyFlinkInterpreter =
         new LazyOpenInterpreter(new IPyFlinkInterpreter(properties));
@@ -108,9 +109,9 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
   }
 
   @Test
-  public void testPyFlink() throws InterpreterException {
-    IPyFlinkInterpreterTest.testBatchPyFlink(interpreter);
-    IPyFlinkInterpreterTest.testStreamPyFlink(interpreter);
+  public void testPyFlink() throws InterpreterException, IOException {
+    IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
+    IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
   }
 
   protected InterpreterContext getInterpreterContext() {
diff --git a/flink/src/test/resources/log4j.properties b/flink/src/test/resources/log4j.properties
index 0d84434..8017840 100644
--- a/flink/src/test/resources/log4j.properties
+++ b/flink/src/test/resources/log4j.properties
@@ -15,11 +15,12 @@
 # limitations under the License.
 #
 
-log4j.rootLogger = WARN, stdout
+log4j.rootLogger = INFO, stdout
 
 log4j.appender.stdout = org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 
 log4j.logger.org.apache.hive=WARN
+log4j.logger.org.apache.flink=WARN
 
diff --git a/flink/src/test/resources/log4j2.properties b/flink/src/test/resources/log4j2.properties
index cf94a3e..1bce906 100755
--- a/flink/src/test/resources/log4j2.properties
+++ b/flink/src/test/resources/log4j2.properties
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-status = WARN
+status = INFO
 name = HiveLog4j2
 packages = org.apache.hadoop.hive.ql.log
 
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index 0b55018..e403a59 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.python;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Files;
 import com.google.gson.Gson;
 import org.apache.commons.exec.CommandLine;
@@ -60,7 +59,7 @@ public class PythonInterpreter extends Interpreter {
   private static final int MAX_TIMEOUT_SEC = 30;
 
   private GatewayServer gatewayServer;
-  private PythonProcessLauncher pythonProcessLauncher;
+  protected PythonProcessLauncher pythonProcessLauncher;
   private File pythonWorkDir;
   protected boolean useBuiltinPy4j = true;
 
@@ -163,7 +162,6 @@ public class PythonInterpreter extends Interpreter {
     }
   }
 
-  @VisibleForTesting
   public PythonProcessLauncher getPythonProcessLauncher() {
     return pythonProcessLauncher;
   }
@@ -572,7 +570,7 @@ public class PythonInterpreter extends Interpreter {
     LOGGER.debug("Python Process Output: " + message);
   }
 
-  class PythonProcessLauncher extends ProcessLauncher {
+  public class PythonProcessLauncher extends ProcessLauncher {
 
     PythonProcessLauncher(CommandLine commandLine, Map<String, String> envs) {
       super(commandLine, envs);
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index cabcabc..f3361a4 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -146,7 +146,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
     assertEquals(Code.ERROR, result.code());
     output = context.out.toInterpreterResultMessage().get(0);
     assertTrue(output.getData(),
-            output.getData().equals("Ipython kernel has been stopped. Please check logs. "
+            output.getData().contains("Ipython kernel has been stopped. Please check logs. "
         + "It might be because of an out of memory issue."));
   }
 
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
index 5f8164f..dff1900 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.jupyter;
 
-import com.google.common.annotations.VisibleForTesting;
 import io.grpc.ManagedChannelBuilder;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.environment.EnvironmentUtils;
@@ -213,7 +212,6 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
     return EnvironmentUtils.getProcEnvironment();
   }
 
-  @VisibleForTesting
   public JupyterKernelProcessLauncher getKernelProcessLauncher() {
     return jupyterKernelProcessLauncher;
   }