You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/04/26 13:37:36 UTC

[GitHub] [incubator-seatunnel] legendtkl opened a new issue, #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

legendtkl opened a new issue, #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753

   ### Search before asking
   
   - [X] I had searched in the [feature](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement.
   
   
   ### Description
   
   Now, we have a shell script `start-seatunnel-sql.sh` to start Flink SQL job. But it seems config file is filled with SQL script, and we have no place to set env (dynamic configuration).
   
   I propose to modify the config file to contain env setting.
   
   ps: It seem `start-seatunnel-sql.sh` works some different with `start-seatunnel-flink.sh` and `start-seatunnel-spark.sh`. Would these code be refactored in the long term ?
   
   ### Usage Scenario
   
   Run Flink SQL job 
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1109964704

   To be honest, I want to remove the `Flink sql` job type, since I think the user can directly use flink sql client to submit the sql.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] legendtkl commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
legendtkl commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1112855183

   deprecated api used now
   ![image](https://user-images.githubusercontent.com/2370761/165883497-f1480703-5251-46d7-a249-53439c077502.png)
   ![image](https://user-images.githubusercontent.com/2370761/165883516-07ca0ee2-08a8-4ed5-9859-690c7a73a198.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] legendtkl commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
legendtkl commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1112836256

   Hi, @ruanwenjun @BenJFan , I have just subscribe the dev mail and send a proposal with the subject "[Proposal] Flink SQL Improvement"
   
   Would you help have a look about this?
   
   Thanks,
   Kelu


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1109984313

   > env
   
   
   
   > Hi, @ruanwenjun , thanks for the reply.
   > 
   > In my opinion, flink sql client is suitable for interactive mode, for example, test/notebook, etc.
   > 
   > For production, we would like to use flink sql + application mode to construct a more robust and isolation-friendly long-running job, such as ETL, Data Integration Service, and so on.
   > 
   > In the long term, Flink SQL would be a better user friendly programming paradigm, compared to DataStream/DataSet API.
   > 
   > This is my point. I hope the seatunnel support Flink SQL.
   > 
   > Furthermore, our team would like to co-build this direction if we keep it. There is a lot of work, such as connector/udf/catalog/sql parser, balabala
   > 
   > Look forward for more discussion.
   > 
   > Thanks, Kelu
   
   Welcome to improve this module :).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] legendtkl commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
legendtkl commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1109981972

   Hi, @ruanwenjun , thanks for the reply.
   
   In my opinion, flink sql client is suitable for interactive mode, for example, test/notebook, etc.
   
   For production, we would like to use flink sql + application mode to construct a more robust and isolation-friendly long-running job, such as ETL, Data Integration Service, and so on.
   
   In the long term, Flink SQL would be a better user friendly programming paradigm, compared to DataStream/DataSet API. 
   
   This is my point. I hope the seatunnel support Flink SQL.
   
   Furthermore, our team would like to co-build this direction if we keep it. There is a lot of work, such as connector/udf/catalog/sql parser, balabala
   
   Look forward for more discussion.
   
   Thanks,
   Kelu
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] legendtkl commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
legendtkl commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1109810606

   Hi, @ruanwenjun , or someone else, would you give me some feedback about this?
   
   I can work on it.
   
   Thanks,
   Kelu


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] quanzhian commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
quanzhian commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1114500371

   @legendtkl 
   I tried your idea. I'll tell you why I didn't use it,
   1. The content parsing fails, and the parsing method needs to be extended
   2. The syntax is inconsistent. Originally, only one SQL syntax was used for implementation, but now it has become a mixture of multiple grammars, which is a little nondescript
   3. Poor readability and maintainability
   
   The reason why I use SQL like method to set the Flink configuration
   1. In terms of parsing, you only need to parse the contents of ordinary SQL files
   2. The configuration content is implemented in SQL syntax, which is consistent
   3. Good readability and maintainability
   
   
   
   The reason why I use SQL like method to set the Flink configuration
   
   1. In terms of parsing, you only need to parse the contents of ordinary SQL files
   
   2. The configuration content is implemented in SQL syntax, which is consistent
   
   3. Good readability and maintainability


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] BenJFan commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
BenJFan commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1110452487

   > @legendtkl Which env do you want to set, if you want to set the system environment, you can set these in [`seatunnel-env.sh`](https://github.com/apache/incubator-seatunnel/blob/dev/config/seatunnel-env.sh), if you want to set flink sql configuration, e.g. `table.exec.mini-batch.enabled`, `table.exec.mini-batch.size `, you can directly set in [`flink.sql.conf.template`](https://github.com/apache/incubator-seatunnel/blob/dev/config/flink.sql.conf.template)
   > 
   > Welcome to refactor this module, if you have any idea. But before your work, it's better to send your proposal to [dev@seatunnel.apache.org](mailto:dev@seatunnel.apache.org) with title [PROPOSAL] email title.
   > 
   > PS: to be honest, I want to remove the `seatunnel-core-flink-sql` module, since I think it's more convenient for the user to directly use flink sql client to submit the sql. cc @CalvinKirs @BenJFan
   
   Aggre with you.  `seatunnel-core-flink-sql` module is same as flink orginal client. We didn't do much thing on it. I think we can remove it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] quanzhian commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
quanzhian commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1113105967

   I have an idea @legendtkl @ruanwenjun @BenJFan 
   
   Customize a set statement prefixed with "SET flink_env.execution.parallelism = 1; "
   
   ```
   --
   -- Licensed to the Apache Software Foundation (ASF) under one or more
   -- contributor license agreements.  See the NOTICE file distributed with
   -- this work for additional information regarding copyright ownership.
   -- The ASF licenses this file to You under the Apache License, Version 2.0
   -- (the "License"); you may not use this file except in compliance with
   -- the License.  You may obtain a copy of the License at
   --
   --     http://www.apache.org/licenses/LICENSE-2.0
   --
   -- Unless required by applicable law or agreed to in writing, software
   -- distributed under the License is distributed on an "AS IS" BASIS,
   -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   -- See the License for the specific language governing permissions and
   -- limitations under the License.
   --
   
   --
   -- This config file is a demonstration of sql processing in SeaTunnel config
   --
   --
   
   SET flink_env.execution.parallelism = 1;
   SET flink_env.execution.checkpoint.interval = 10000;
   SET flink_env.execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint";
   
   SET 'table.dml-sync' = 'true';
   
   CREATE TABLE events (
     f_type INT,
     f_uid INT,
     ts AS localtimestamp,
     WATERMARK FOR ts AS ts
   ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_type.min'='1',
     'fields.f_type.max'='5',
     'fields.f_uid.min'='1',
     'fields.f_uid.max'='1000'
   );
   
   CREATE TABLE print_table (
     type INT,
     uid INT,
     lstmt TIMESTAMP
   ) WITH (
     'connector' = 'print',
     'sink.parallelism' = '1'
   );
   
   INSERT INTO print_table SELECT * FROM events where f_type = 1;
   
   ```
   
   
   
   This is my implementation and testing process
   
   ```
   [xxx@bigdata-app03 apache-seatunnel-incubating-2.1.1-SNAPSHOT]# ./bin/start-seatunnel-sql.sh -m yarn-cluster -ys 1 -yjm 2G -ytm 3G -ynm seatunnel_flink_job --config /mnt/services/seatunnel/flink_sql_01.conf
   
   Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in [jar:file:/mnt/services/flink-1.12.1/lib/log4j-slf4j-impl-2.17.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in [jar:file:/usr/hdp/3.1.4.0-315/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
   SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
   
    ############# print args ############# 
   --config
   /mnt/services/seatunnel/flink_sql_01.conf
   
   
    ############# print env ############# 
   execution.parallelism = 1
   execution.checkpoint.interval = 10000
   execution.planner = blink
   
   
    ############# print sql ############# 
   SET 'table.local-time-zone' = 'Asia/Shanghai'
   CREATE TABLE events (
     f_type INT,
     f_uid INT,
     ts AS localtimestamp,
     WATERMARK FOR ts AS ts
   ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_type.min'='1',
     'fields.f_type.max'='5',
     'fields.f_uid.min'='1',
     'fields.f_uid.max'='1000'
   )
   CREATE TABLE print_table (
     type INT,
     uid INT,
     lstmt TIMESTAMP
   ) WITH (
     'connector' = 'print'
   )
   INSERT INTO print_table SELECT * FROM events where f_type = 1
   
   #############setConfiguration#############
     key='table.local-time-zone' value= 'Asia/Shanghai'
   2022-04-29 15:00:50,664 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at bigdata-master01/172.18.247.15:10200
   2022-04-29 15:00:50,673 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
   2022-04-29 15:00:50,794 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.4.0-315/0/resource-types.xml
   2022-04-29 15:00:50,853 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=3072, slotsPerTaskManager=1}
   2022-04-29 15:00:51,345 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
   2022-04-29 15:00:56,002 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1643094720025_43463
   2022-04-29 15:00:56,038 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1643094720025_43463
   2022-04-29 15:00:56,039 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
   2022-04-29 15:00:56,041 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
   2022-04-29 15:01:06,899 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
   2022-04-29 15:01:06,900 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface bigdata-datanode01:39023 of application 'application_1643094720025_43463'.
   
    ############# print flink applicationId ############# 
   application_1643094720025_43463
   
   Job has been submitted with JobID ccd2030e27d1fae4238f491d431a4c36
   
    ############# print jobId ############# 
   job-submitted-success:ccd2030e27d1fae4238f491d431a4c36
   
   ```
   
   
   
   This is my core code
   
   ```
   /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   package org.apache.seatunnel.core.sql.job;
   
   import org.apache.seatunnel.common.utils.VariablesSubstitute;
   import org.apache.seatunnel.core.sql.splitter.SqlStatementSplitter;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.Map;
   import java.util.regex.Matcher;
   import java.util.regex.Pattern;
   import java.util.stream.Collectors;
   
   public class JobInfo {
   
       private static final String DELIMITER = "=";
   
       private String jobContent;
   
       private List<String> flinkEnvList = new ArrayList<>();
   
       private List<String> flinkSqlList = new ArrayList<>();
   
       public JobInfo(String jobContent) {
           this.jobContent = jobContent;
       }
   
       public String getJobContent() {
           return jobContent;
       }
   
       public List<String> getFlinkEnvList() {
           return flinkEnvList;
       }
   
       public List<String> getFlinkSqlList() {
           return flinkSqlList;
       }
   
       public void substitute(List<String> variables) {
           Map<String, String> substituteMap = variables.stream()
                   .filter(v -> v.contains(DELIMITER))
                   .collect(Collectors.toMap(v -> v.split(DELIMITER)[0], v -> v.split(DELIMITER)[1]));
           jobContent = VariablesSubstitute.substitute(jobContent, substituteMap);
           this.analysisJobContent();
       }
   
       private void analysisJobContent() {
           List<String> stmts = SqlStatementSplitter.normalizeStatements(this.jobContent);
           for (String stmt : stmts) {
               String patternStr = FlinkSqlConstant.PATTERN_FLINK_ENV_REGEX;
               Pattern pattern = Pattern.compile(patternStr, FlinkSqlConstant.DEFAULT_PATTERN_FLAGS);
               Matcher matcher = pattern.matcher(stmt);
               if (matcher.find() && stmt.trim().toUpperCase().startsWith(FlinkSqlConstant.FLINK_SQL_SET_PREFIX)) {
                   String replaceStr = matcher.replaceAll("");
                   flinkEnvList.add(replaceStr);
               } else {
                   flinkSqlList.add(stmt);
               }
           }
           LogPrint.envPrint(flinkEnvList);
           LogPrint.sqlPrint(flinkSqlList);
       }
   
   }
   
   ```
   
   ```
   /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   package org.apache.seatunnel.core.sql.job;
   
   import java.util.regex.Pattern;
   
   public final class FlinkSqlConstant {
   
       public static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
   
       public static final int FLINK_SQL_SET_OPERANDS = 3;
   
       public static final String QUERY_JOBID_KEY_WORD = "job-submitted-success:";
   
       public static final String SYSTEM_LINE_SEPARATOR = System.getProperty("line.separator");
   
       public static final String PATTERN_FLINK_ENV_REGEX = "SET\\s+flink_env\\.";
   
       public static final String FLINK_SQL_SET_PREFIX = "SET";
   
       private FlinkSqlConstant() { }
   }
   
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] legendtkl commented on issue #1753: [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job

Posted by GitBox <gi...@apache.org>.
legendtkl commented on issue #1753:
URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1113349717

   hi, @quanzhian , thanks for your reply. The workaround do works.
   
   But in my opinion, we can split the config into serval sub config, such as follows. It will behave more flexible. What do you think?
   ```text
   env {
       execution.parallelism = 1
   }
   
   sql {
     CREATE TABLE events (
     f_type INT,
     f_uid INT,
     ts AS localtimestamp,
     WATERMARK FOR ts AS ts
   ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_type.min'='1',
     'fields.f_type.max'='5',
     'fields.f_uid.min'='1',
     'fields.f_uid.max'='1000'
   )
   CREATE TABLE print_table (
     type INT,
     uid INT,
     lstmt TIMESTAMP
   ) WITH (
     'connector' = 'print'
   )
   INSERT INTO print_table SELECT * FROM events where f_type = 1
   }
   
   connectors {
     ...
   }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org