You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/07/30 05:48:29 UTC

[zeppelin] branch master updated: [ZEPPELIN-4967]. hive dialect doesn't work in flink interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c4336e  [ZEPPELIN-4967]. hive dialect doesn't work in flink interpreter
9c4336e is described below

commit 9c4336e47d8503ccc1ae60b1a39287a460bdb880
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Jul 24 14:33:37 2020 +0800

    [ZEPPELIN-4967]. hive dialect doesn't work in flink interpreter
    
    ### What is this PR for?
    
    The root cause is that we only set table config in select and insert statement, but setting dialect is for DDL. This PR remove the paragraph configuration setting, instead would set table config in global scope.
    
    ### What type of PR is it?
    [Refactoring]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4967
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? NO
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3867 from zjffdu/ZEPPELIN-4967 and squashes the following commits:
    
    c0f808216 [Jeff Zhang] [ZEPPELIN-4967]. hive dialect doesn't work in flink interpreter
---
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 22 +---------------------
 .../apache/zeppelin/flink/SqlInterpreterTest.java  |  3 +++
 2 files changed, 4 insertions(+), 21 deletions(-)

diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 3214ca5..ab497c3 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -70,9 +70,6 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
   // all the available sql config options. see
   // https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
   private Map<String, ConfigOption> tableConfigOptions;
-  // represent paragraph's tableConfig
-  // paragraphId --> tableConfig
-  private Map<String, Map<String, String>> paragraphTableConfigMap = new HashMap<>();
 
   public FlinkSqlInterrpeter(Properties properties) {
     super(properties);
@@ -128,10 +125,6 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
   }
 
   private InterpreterResult runSqlList(String st, InterpreterContext context) {
-    // clear current paragraph's tableConfig before running any sql statements
-    Map<String, String> tableConfig = paragraphTableConfigMap.getOrDefault(context.getParagraphId(), new HashMap<>());
-    tableConfig.clear();
-    paragraphTableConfigMap.put(context.getParagraphId(), tableConfig);
 
     try {
       boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
@@ -485,12 +478,6 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
   public void callSelect(String sql, InterpreterContext context) throws IOException {
     try {
       lock.lock();
-      // set table config from set statement until now.
-      Map<String, String> paragraphTableConfig = paragraphTableConfigMap.get(context.getParagraphId());
-      for (Map.Entry<String, String> entry : paragraphTableConfig.entrySet()) {
-        this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
-      }
-
       callInnerSelect(sql, context);
     } finally {
       if (lock.isHeldByCurrentThread()) {
@@ -506,7 +493,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
       throw new IOException(key + " is not a valid table/sql config, please check link: " +
               "https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html");
     }
-    paragraphTableConfigMap.get(context.getParagraphId()).put(key, value);
+    this.tbenv.getConfig().getConfiguration().setString(key, value);
   }
 
   public void callInsertInto(String sql,
@@ -516,13 +503,6 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
      }
      try {
        lock.lock();
-
-       // set table config from set statement until now.
-       Map<String, String> paragraphTableConfig = paragraphTableConfigMap.get(context.getParagraphId());
-       for (Map.Entry<String, String> entry : paragraphTableConfig.entrySet()) {
-         this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
-       }
-
        boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
        if (!runAsOne) {
          this.tbenv.sqlUpdate(sql);
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index ffe2fce..0b621ce 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -482,6 +482,9 @@ public abstract class SqlInterpreterTest {
               "set table.sql-dialect=hive", context);
       assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
 
+      sqlInterpreter.interpret("create table test_hive_table(a string, b int)\n" +
+              "partitioned by (dt string)", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     } else {
       // Flink1.10 doesn't support set table.sql-dialet which is introduced in flink 1.11
       InterpreterContext context = getInterpreterContext();