You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/03/19 14:05:19 UTC

[zeppelin] branch master updated: [ZEPPELIN-4688]. Support set statement in flink sql

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new dc45a89  [ZEPPELIN-4688]. Support set statement in flink sql
dc45a89 is described below

commit dc45a899dce0a7307663ae5ade51325763dc7f00
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Mar 19 14:28:55 2020 +0800

    [ZEPPELIN-4688]. Support set statement in flink sql
    
    ### What is this PR for?
    This PR is to add support for set statement in flink sql.  The properties in the set statement only affect the current paragraph, we would reset the properties after the paragraph is finished
    
    ### What type of PR is it?
    [ Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4688
    
    ### How should this be tested?
    * CI Pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3693 from zjffdu/ZEPPELIN-4688 and squashes the following commits:
    
    790b241db [Jeff Zhang] [ZEPPELIN-4688]. Support set statement in flink sql
---
 flink/pom.xml                                      |  4 +
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 87 ++++++++++++++++++++--
 .../flink/FlinkBatchSqlInterpreterTest.java        | 59 +++++++++++++++
 3 files changed, 144 insertions(+), 6 deletions(-)

diff --git a/flink/pom.xml b/flink/pom.xml
index 9be4fe5..21ec182 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -492,6 +492,10 @@
           <groupId>io.netty</groupId>
           <artifactId>netty</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index d794b4b..82ac50e 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -22,12 +22,16 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.PythonOptions;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -42,15 +46,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public abstract class FlinkSqlInterrpeter extends Interpreter {
@@ -69,6 +72,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
           .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
           .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
           .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
+          .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
           .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
           .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
           .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
@@ -86,7 +90,11 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
   private SqlSplitter sqlSplitter;
   private int defaultSqlParallelism;
   private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
-
+  // all the available sql config options. see
+  // https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
+  private Map<String, ConfigOption> tableConfigOptions;
+  // represent the current paragraph's configOptions
+  private Map<String, String> currentConfigOptions = new HashMap<>();
 
   public FlinkSqlInterrpeter(Properties properties) {
     super(properties);
@@ -117,6 +125,31 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
     flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
     flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
     this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism();
+    this.tableConfigOptions = extractTableConfigOptions();
+  }
+
+  private Map<String, ConfigOption> extractTableConfigOptions() {
+    Map<String, ConfigOption> configOptions = new HashMap<>();
+    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
+    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
+    configOptions.putAll(extractConfigOptions(PythonOptions.class));
+    return configOptions;
+  }
+
+  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
+    Map<String, ConfigOption> configOptions = new HashMap();
+    Field[] fields = clazz.getDeclaredFields();
+    for (Field field : fields) {
+      if (field.getType().isAssignableFrom(ConfigOption.class)) {
+        try {
+          ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
+          configOptions.put(configOption.key(), configOption);
+        } catch (Throwable e) {
+          LOGGER.warn("Fail to get ConfigOption", e);
+        }
+      }
+    }
+    return configOptions;
   }
 
   @Override
@@ -139,6 +172,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
   }
 
   private InterpreterResult runSqlList(String st, InterpreterContext context) {
+    currentConfigOptions.clear();
     List<String> sqls = sqlSplitter.splitSql(st);
     for (String sql : sqls) {
       Optional<SqlCommandParser.SqlCommandCall> sqlCommand = SqlCommandParser.parse(sql);
@@ -210,6 +244,9 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
       case SELECT:
         callSelect(cmdCall.operands[0], context);
         break;
+      case SET:
+        callSet(cmdCall.operands[0], cmdCall.operands[1], context);
+        break;
       case INSERT_INTO:
       case INSERT_OVERWRITE:
         callInsertInto(cmdCall.operands[0], context);
@@ -401,25 +438,47 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
   public void callSelect(String sql, InterpreterContext context) throws IOException {
     try {
       lock.lock();
+      // set parallelism from paragraph local property
       if (context.getLocalProperties().containsKey("parallelism")) {
         this.tbenv.getConfig().getConfiguration()
                 .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                         Integer.parseInt(context.getLocalProperties().get("parallelism")));
       }
-      callInnerSelect(sql, context);
 
+      // set table config from set statement until now.
+      for (Map.Entry<String, String> entry : currentConfigOptions.entrySet()) {
+        this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
+      }
+      callInnerSelect(sql, context);
     } finally {
       if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
+      // reset parallelism
       this.tbenv.getConfig().getConfiguration()
               .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                       defaultSqlParallelism);
+      // reset table config
+      for (ConfigOption configOption: tableConfigOptions.values()) {
+        // some may has no default value, e.g. ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
+        if (configOption.defaultValue() != null) {
+          this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
+        }
+      }
+      this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
     }
   }
 
   public abstract void callInnerSelect(String sql, InterpreterContext context) throws IOException;
 
+  public void callSet(String key, String value, InterpreterContext context) throws IOException {
+    if (!tableConfigOptions.containsKey(key)) {
+      throw new IOException(key + " is not a valid table/sql config, please check link: " +
+              "https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html");
+    }
+    currentConfigOptions.put(key, value);
+  }
+
   private void callInsertInto(String sql,
                               InterpreterContext context) throws IOException {
      if (!isBatch()) {
@@ -432,6 +491,12 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
                  .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                          Integer.parseInt(context.getLocalProperties().get("parallelism")));
        }
+
+       // set table config from set statement until now.
+       for (Map.Entry<String, String> entry : currentConfigOptions.entrySet()) {
+         this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
+       }
+
        this.tbenv.sqlUpdate(sql);
        this.tbenv.execute(sql);
      } catch (Exception e) {
@@ -440,9 +505,19 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
        if (lock.isHeldByCurrentThread()) {
          lock.unlock();
        }
+
+       // reset parallelism
        this.tbenv.getConfig().getConfiguration()
                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                        defaultSqlParallelism);
+       // reset table config
+       for (ConfigOption configOption: tableConfigOptions.values()) {
+         // some may has no default value, e.g. ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
+         if (configOption.defaultValue() != null) {
+           this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
+         }
+       }
+       this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
      }
      context.out.write("Insertion successfully.\n");
   }
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 3fe35c5..c75d7fe 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -20,6 +20,8 @@ package org.apache.zeppelin.flink;
 
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -237,4 +239,61 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     //    resultMessages = context.out.toInterpreterResultMessage();
     //    assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
   }
+
+  @Test
+  public void testSetTableConfig() throws InterpreterException, IOException {
+    hiveShell.execute("create table source_table (id int, name string)");
+    hiveShell.execute("insert into source_table values(1, 'a'), (2, 'b')");
+
+    File destDir = Files.createTempDirectory("flink_test").toFile();
+    FileUtils.deleteDirectory(destDir);
+    InterpreterResult result = sqlInterpreter.interpret(
+            "CREATE TABLE sink_table (\n" +
+                    "id int,\n" +
+                    "name string" +
+                    ") WITH (\n" +
+                    "'format.field-delimiter'=',',\n" +
+                    "'connector.type'='filesystem',\n" +
+                    "'format.derive-schema'='true',\n" +
+                    "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
+                    "'format.type'='csv'\n" +
+                    ");", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // set parallelism then insert into
+    InterpreterContext context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "set table.exec.resource.default-parallelism=10;" +
+            "insert into sink_table select * from source_table", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+    assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
+            sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
+
+    // set then insert into
+    destDir.delete();
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "set table.optimizer.source.predicate-pushdown-enabled=false;" +
+                    "insert into sink_table select * from source_table", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+    assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
+            sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
+    assertEquals(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED.defaultValue(),
+            sqlInterpreter.tbenv.getConfig().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED));
+
+    // invalid config
+    destDir.delete();
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "set table.invalid_config=false;" +
+                    "insert into sink_table select * from source_table", context);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertTrue(resultMessages.get(0).getData(),
+            resultMessages.get(0).getData().contains("table.invalid_config is not a valid table/sql config"));
+  }
 }