You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/10/18 16:22:59 UTC

[GitHub] [incubator-streampark] macksonmu opened a new pull request, #1866: Add macksonmu

macksonmu opened a new pull request, #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866

   <!--
   Thank you for contributing to StreamPark! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   ## Contribution Checklist
   
     - If this is your first time, please read our contributor guidelines: [Submit Code](https://streampark.apache.org/community/submit_guide/submit_code).
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/streampark/issues).
   
     - Name the pull request in the form "[Feature] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
   
     - If the PR is unfinished, add `[WIP]` in your PR title, e.g., `[WIP][Feature] Title of the pull request`.
   
   -->
   
   ## What changes were proposed in this pull request
   
   Issue Number: close #1781  <!-- REMOVE this line if no issue to close -->
   
   <!--(For example: This pull request proposed to add checkstyle plugin).-->
   
   ## Brief change log
   
   <!--*(for example:)*
   - *Add maven-checkstyle-plugin to root pom.xml*
   -->
   
   ## Verifying this change
   
   <!--*(Please pick either of the following options)*-->
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   <!--*(example:)*
   - *Added integration tests for end-to-end.*
   - *Added *Test to verify the change.*
   - *Manually verified the change by testing locally.* -->
   
   ## Does this pull request potentially affect one of the following parts
    - Dependencies (does it add or upgrade a dependency): (yes / no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001917086


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +110,74 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String parseVariable(Long teamId, String paramWithPlaceholders) {

Review Comment:
   > @1996fanrui @wolfboys is this okay?
   > 
   > ```
   > /**
   >      * Replace variable with defined variable codes.
   >      * @param teamId
   >      * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
   >      * @return
   >      */
   >     @Override
   >     public String replaceVariable(Long teamId, String mixed) {
   >         if (StringUtils.isEmpty(mixed)) {
   >             return mixed;
   >         }
   >         List<Variable> variables = findByTeamId(teamId);
   >         if (CollectionUtils.isEmpty(variables)) {
   >             return mixed;
   >         }
   >         Map<String, String> variableMap = variables.stream().collect(Collectors.toMap(Variable::getVariableCode, Variable::getVariableValue));
   >         String restore = mixed;
   >         Matcher matcher = placeholderPattern.matcher(restore);
   >         while (matcher.find()) {
   >             String placeholder = matcher.group();
   >             String variableCode = getCodeFromPlaceholder(placeholder);
   >             String variableVaule = variableMap.get(variableCode);
   >             if (StringUtils.isNotEmpty(variableVaule)) {
   >                 restore = restore.replace(placeholder, variableVaule);
   >             }
   >         }
   >         return restore;
   >     }
   > ```
   
   LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1286515764

   > 
   
   good idea


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001896066


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +110,74 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String parseVariable(Long teamId, String paramWithPlaceholders) {

Review Comment:
   Change variables to uppercase
   
   ``` 
   private final Pattern PLACEHOLDER_PATTERN = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
   
   private final String PLACEHOLDER_START = "${";
   
   private final String PLACEHOLDER_END = "}";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] lvshaokang commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
lvshaokang commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r998953920


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,65 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {

Review Comment:
   Could you add unit test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001885123


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +110,74 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String parseVariable(Long teamId, String paramWithPlaceholders) {

Review Comment:
   @1996fanrui @wolfboys is this okay?
   
   /**
        * Replace variable with defined variable codes.
        * @param teamId
        * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
        * @return
        */
       @Override
       public String replaceVariable(Long teamId, String mixed) {
           if (StringUtils.isEmpty(mixed)) {
               return mixed;
           }
           List<Variable> variables = findByTeamId(teamId);
           if (CollectionUtils.isEmpty(variables)) {
               return mixed;
           }
           Map<String, String> variableMap = variables.stream().collect(Collectors.toMap(Variable::getVariableCode, Variable::getVariableValue));
           String restore = mixed;
           Matcher matcher = placeholderPattern.matcher(restore);
           while (matcher.find()) {
               String placeholder = matcher.group();
               String variableCode = getCodeFromPlaceholder(placeholder);
               String variableVaule = variableMap.get(variableCode);
               if (StringUtils.isNotEmpty(variableVaule)) {
                   restore = restore.replace(placeholder, variableVaule);
               }
           }
           return restore;
       }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001730063


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,79 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");

Review Comment:
   If `placeholderStart` and `placeholderEnd ` can be configured, the placeholderPattern should be dynamic.
   
   And I'm afraid, there may be bugs here. 
   
   Hi @macksonmu @wolfboys , I don't recommend supporting configuration `placeholder.start` and `placeholder.end`. Because:
   
   - If placeholderStart is `a`, and `placeholderEnd` is `b`, it’s very easy to conflict.
   - It will also become harder if we want to automatically search for variables when we type $ in the future.
   
   We can support it if users really need it in the future. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r999524981


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,65 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        Iterator<Application> appIt = applications.iterator();
+        while (appIt.hasNext()) {
+            Application application = appIt.next();
+            Matcher matcher = placeholderPattern.matcher(application.getArgs());
+            if (matcher.find()) {

Review Comment:
   Please help check whether we should use `while` instead of `if`.
   
   I see the line 122 use while.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/VariableController.java:
##########
@@ -86,17 +83,12 @@ public RestResponse updateVariable(@Valid Variable variable) throws Exception {
     @DeleteMapping("delete")
     @RequiresPermissions("variable:delete")
     public RestResponse deleteVariables(@Valid Variable variable) throws Exception {

Review Comment:
   `deleteVariables` rename to `deleteVariable` ?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,74 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private final String placeholderLeft = "${";
+
+    private final String placeholderRight  = "}";
+
     @Autowired
     private ApplicationService applicationService;
 
+    @Autowired
+    private FlinkSqlService flinkSqlService;
+
     @Autowired
     private CommonService commonService;
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public void createVariable(Variable variable) throws Exception {
+    public void createVariable(Variable variable) {
         if (this.findByVariableCode(variable.getTeamId(), variable.getVariableCode()) != null) {
             throw new ApiAlertException("Sorry, the variable code already exists.");
         }
         variable.setCreatorId(commonService.getUserId());
         this.save(variable);
     }
 
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void deleteVariable(Variable variable) {
+        if (isDependByApplications(variable)) {
+            throw new ApiAlertException("Sorry, the variable is actually used.");

Review Comment:
   Should we tell users which applications is using the variable?
   
   When the cluster goes offline, no app should still use this variable, and the admin can get the app list by this alert.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,65 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        Iterator<Application> appIt = applications.iterator();
+        while (appIt.hasNext()) {
+            Application application = appIt.next();
+            Matcher matcher = placeholderPattern.matcher(application.getArgs());
+            if (matcher.find()) {
+                String placeholder = matcher.group();
+                String dependVariableCode = getCodeFromPlaceholder(placeholder);
+                if (variable.getVariableCode().equals(dependVariableCode)) {
+                    return true;
+                }
+            }
+        }
+
+        // Detect whether variables are dependent on all versions of flink sql
+        List<FlinkSql> sqls = flinkSqlService.getByTeamId(variable.getTeamId());
+        Iterator<FlinkSql> sqlIt = sqls.iterator();
+        while (sqlIt.hasNext()) {
+            FlinkSql sql = sqlIt.next();
+            Matcher matcher = placeholderPattern.matcher(DeflaterUtils.unzipString(sql.getSql()));
+            if (matcher.find()) {
+                String placeholder = matcher.group();
+                String dependVariableCode = getCodeFromPlaceholder(placeholder);
+                if (variable.getVariableCode().equals(dependVariableCode)) {
+                    return true;
+                }
+            }

Review Comment:
   From line `154-161` should can be extract a method. This method can be shared for args and sql content.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,65 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        Iterator<Application> appIt = applications.iterator();
+        while (appIt.hasNext()) {
+            Application application = appIt.next();
+            Matcher matcher = placeholderPattern.matcher(application.getArgs());

Review Comment:
   I'm not sure if we should use `matcher` or `String.contain` here.
   
   For example, variable code == kafka_cluster.  The `application.getArgs().contains("${kafka_cluster}")` may be more efficient than `matcher`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1000109426


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,65 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        Iterator<Application> appIt = applications.iterator();
+        while (appIt.hasNext()) {
+            Application application = appIt.next();
+            Matcher matcher = placeholderPattern.matcher(application.getArgs());
+            if (matcher.find()) {
+                String placeholder = matcher.group();
+                String dependVariableCode = getCodeFromPlaceholder(placeholder);
+                if (variable.getVariableCode().equals(dependVariableCode)) {
+                    return true;
+                }
+            }
+        }
+
+        // Detect whether variables are dependent on all versions of flink sql
+        List<FlinkSql> sqls = flinkSqlService.getByTeamId(variable.getTeamId());
+        Iterator<FlinkSql> sqlIt = sqls.iterator();
+        while (sqlIt.hasNext()) {
+            FlinkSql sql = sqlIt.next();
+            Matcher matcher = placeholderPattern.matcher(DeflaterUtils.unzipString(sql.getSql()));
+            if (matcher.find()) {
+                String placeholder = matcher.group();
+                String dependVariableCode = getCodeFromPlaceholder(placeholder);
+                if (variable.getVariableCode().equals(dependVariableCode)) {
+                    return true;
+                }
+            }

Review Comment:
   Good idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1286449361

   hi @macksonmu:
   thanks for your hard work. have a little suggestion:
    Currently, placeholder-start char and end char is hard-coded , Can they be made configurable, They can be configured in application.yml, e.g:
    
   ```
   streampark:
     variable:
       placeholder-start: ${
       placeholder-end: }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001722386


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java:
##########
@@ -42,7 +42,6 @@ public class Variable implements Serializable {
     private String variableCode;
 
     @NotBlank(message = "{required}")
-    @Size(max = 50, message = "{noMoreThan}")

Review Comment:
   Why delete this code?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,79 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");

Review Comment:
   If `placeholderStart` and `placeholderEnd ` can be configured, the placeholderPattern should be dynamic.
   
   And I'm afraid, there may be bugs here. 
   
   Hi @macksonmu @wolfboys , I don't recommend supporting configuration `placeholder.start` and `placeholder.end`. Because:
   
   - If placeholderStart is `a`, and `placeholderEnd` is `b`, it’s very easy to conflict.
   - It will also become harder if we want to automatically search for variables when we type $ in the future.
   
   



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +110,74 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String parseVariable(Long teamId, String paramWithPlaceholders) {

Review Comment:
   How about rename to `replaceVariable`?
   
   Your comment is clear: `Replace placeholders with defined variable codes.`
   
   And I don't think `paramWithPlaceholders` is a good name. What does `param` mean here? Is methodParam? Or do you think `sql` or `job args` are param? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001885123


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +110,74 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String parseVariable(Long teamId, String paramWithPlaceholders) {

Review Comment:
   @1996fanrui @wolfboys is this okay?
   
   ```
   /**
        * Replace variable with defined variable codes.
        * @param teamId
        * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
        * @return
        */
       @Override
       public String replaceVariable(Long teamId, String mixed) {
           if (StringUtils.isEmpty(mixed)) {
               return mixed;
           }
           List<Variable> variables = findByTeamId(teamId);
           if (CollectionUtils.isEmpty(variables)) {
               return mixed;
           }
           Map<String, String> variableMap = variables.stream().collect(Collectors.toMap(Variable::getVariableCode, Variable::getVariableValue));
           String restore = mixed;
           Matcher matcher = placeholderPattern.matcher(restore);
           while (matcher.find()) {
               String placeholder = matcher.group();
               String variableCode = getCodeFromPlaceholder(placeholder);
               String variableVaule = variableMap.get(variableCode);
               if (StringUtils.isNotEmpty(variableVaule)) {
                   restore = restore.replace(placeholder, variableVaule);
               }
           }
           return restore;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1282667490

   @wolfboys @1996fanrui I created 11 variables in production and tested the flink jar and flink sql jobs of the real production business. I have not found any problems so far, please help to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1000122605


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,74 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private final String placeholderLeft = "${";
+
+    private final String placeholderRight  = "}";
+
     @Autowired
     private ApplicationService applicationService;
 
+    @Autowired
+    private FlinkSqlService flinkSqlService;
+
     @Autowired
     private CommonService commonService;
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public void createVariable(Variable variable) throws Exception {
+    public void createVariable(Variable variable) {
         if (this.findByVariableCode(variable.getTeamId(), variable.getVariableCode()) != null) {
             throw new ApiAlertException("Sorry, the variable code already exists.");
         }
         variable.setCreatorId(commonService.getUserId());
         this.save(variable);
     }
 
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void deleteVariable(Variable variable) {
+        if (isDependByApplications(variable)) {
+            throw new ApiAlertException("Sorry, the variable is actually used.");

Review Comment:
   I originally planned to implement the next PR. This requires a paginated page on the front end, and it may take two days to submit. Is the next PR possible?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r999578745


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/VariableController.java:
##########
@@ -86,17 +83,12 @@ public RestResponse updateVariable(@Valid Variable variable) throws Exception {
     @DeleteMapping("delete")
     @RequiresPermissions("variable:delete")
     public RestResponse deleteVariables(@Valid Variable variable) throws Exception {

Review Comment:
   yes it was my mistake



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001896066


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +110,74 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String parseVariable(Long teamId, String paramWithPlaceholders) {

Review Comment:
      Change variables to uppercase
   
    
      private final Pattern PLACEHOLDER_PATTERN = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
   
       private final String PLACEHOLDER_START = "${";
   
       private final String PLACEHOLDER_END = "}";



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys merged pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
wolfboys merged PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1000110882


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,74 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private final String placeholderLeft = "${";
+
+    private final String placeholderRight  = "}";
+
     @Autowired
     private ApplicationService applicationService;
 
+    @Autowired
+    private FlinkSqlService flinkSqlService;
+
     @Autowired
     private CommonService commonService;
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public void createVariable(Variable variable) throws Exception {
+    public void createVariable(Variable variable) {
         if (this.findByVariableCode(variable.getTeamId(), variable.getVariableCode()) != null) {
             throw new ApiAlertException("Sorry, the variable code already exists.");
         }
         variable.setCreatorId(commonService.getUserId());
         this.save(variable);
     }
 
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void deleteVariable(Variable variable) {
+        if (isDependByApplications(variable)) {
+            throw new ApiAlertException("Sorry, the variable is actually used.");

Review Comment:
   Yes, I was planning to do the next PR, since you brought it up, I can submit it together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1286885978

   > hi @macksonmu: thanks for your hard work. have a little suggestion: Currently, placeholder-start char and end char is hard-coded , Can they be made configurable in application.yml? e.g:
   > 
   > ```
   > streampark:
   >   variable:
   >     placeholder-start: ${
   >     placeholder-end: }
   > ```
   
   If the placeholder is changed, all old variables in SQL or args will be invalidated. So I'm not sure whether we need to support change the `placeholder-start` and `placeholder-end`.
   
   I have 2 questions:
   1. Do users really need to modify it?
   2. If users really need it, how to compatible with old variables?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001408024


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,74 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private final String placeholderLeft = "${";

Review Comment:
   I think yours is better



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,74 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private final String placeholderLeft = "${";
+
+    private final String placeholderRight  = "}";

Review Comment:
   I think yours is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r999583825


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,65 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {

Review Comment:
   ok i will do it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1285165412

   
   > @1996fanrui @lvshaokang I've made the fix as suggested, thanks to both of you for your hard work.
   
   thanks for your hard work. I'll review it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1000109636


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,65 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        Iterator<Application> appIt = applications.iterator();
+        while (appIt.hasNext()) {
+            Application application = appIt.next();
+            Matcher matcher = placeholderPattern.matcher(application.getArgs());
+            if (matcher.find()) {

Review Comment:
   it was my mistake



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1000110882


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,74 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private final String placeholderLeft = "${";
+
+    private final String placeholderRight  = "}";
+
     @Autowired
     private ApplicationService applicationService;
 
+    @Autowired
+    private FlinkSqlService flinkSqlService;
+
     @Autowired
     private CommonService commonService;
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public void createVariable(Variable variable) throws Exception {
+    public void createVariable(Variable variable) {
         if (this.findByVariableCode(variable.getTeamId(), variable.getVariableCode()) != null) {
             throw new ApiAlertException("Sorry, the variable code already exists.");
         }
         variable.setCreatorId(commonService.getUserId());
         this.save(variable);
     }
 
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void deleteVariable(Variable variable) {
+        if (isDependByApplications(variable)) {
+            throw new ApiAlertException("Sorry, the variable is actually used.");

Review Comment:
   Yes, I was planning to do the next PR, since you brought it up, I can submit it together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1286919002

   hi macksonmu:
   I'm sorry. hard-coded placeholder-start char and placeholder-end char is the better way. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001862564


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java:
##########
@@ -42,7 +42,6 @@ public class Variable implements Serializable {
     private String variableCode;
 
     @NotBlank(message = "{required}")
-    @Size(max = 50, message = "{noMoreThan}")

Review Comment:
   I added the real variables of the production system and found that 50 characters are really not enough. For example, a kafka cluster has 20 ips



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001406909


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,73 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        if (applications != null) {
+            Iterator<Application> appIt = applications.iterator();
+            while (appIt.hasNext()) {
+                Application application = appIt.next();
+                if (isDepend(variable.getVariableCode(), application.getArgs())) {
+                    return true;
+                }
+            }

Review Comment:
   You are right, I originally wanted to use sqlIt.remove to get the list of dependencies, but it will be reflected in the next PR



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,73 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        if (applications != null) {
+            Iterator<Application> appIt = applications.iterator();
+            while (appIt.hasNext()) {
+                Application application = appIt.next();
+                if (isDepend(variable.getVariableCode(), application.getArgs())) {
+                    return true;
+                }
+            }
+        }
+
+        // Detect whether variables are dependent on all versions of flink sql
+        List<FlinkSql> sqls = flinkSqlService.getByTeamId(variable.getTeamId());
+        if (sqls != null) {
+            Iterator<FlinkSql> sqlIt = sqls.iterator();

Review Comment:
   You are right, I originally wanted to use sqlIt.remove to get the list of dependencies, but it will be reflected in the next PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001412649


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,73 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {

Review Comment:
   I choose parseVariable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001347675


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,73 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        if (applications != null) {
+            Iterator<Application> appIt = applications.iterator();
+            while (appIt.hasNext()) {
+                Application application = appIt.next();
+                if (isDepend(variable.getVariableCode(), application.getArgs())) {
+                    return true;
+                }
+            }
+        }
+
+        // Detect whether variables are dependent on all versions of flink sql
+        List<FlinkSql> sqls = flinkSqlService.getByTeamId(variable.getTeamId());
+        if (sqls != null) {
+            Iterator<FlinkSql> sqlIt = sqls.iterator();

Review Comment:
   code can be simplified too



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,73 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {
+        if (StringUtils.isEmpty(paramWithPlaceholders)) {
+            return paramWithPlaceholders;
+        }
+        String restore = paramWithPlaceholders;
+        Matcher matcher = placeholderPattern.matcher(paramWithPlaceholders);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            Variable variable = findByVariableCode(teamId, variableCode);
+            if (variable != null) {
+                restore = restore.replace(placeholder, variable.getVariableValue());
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the application
+        List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
+        if (applications != null) {
+            Iterator<Application> appIt = applications.iterator();
+            while (appIt.hasNext()) {
+                Application application = appIt.next();
+                if (isDepend(variable.getVariableCode(), application.getArgs())) {
+                    return true;
+                }
+            }

Review Comment:
   code(line 137 ~ 143) can be simplified:
   ```
   for(Application app: applications) {
         if (isDepend(variable.getVariableCode(), app.getArgs())) {
             return true;
         }
     }
     ```



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,74 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private final String placeholderLeft = "${";
+
+    private final String placeholderRight  = "}";

Review Comment:
   how about: placeholderEnd?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,74 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private final String placeholderLeft = "${";

Review Comment:
   how about: placeholderStart?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -79,4 +105,73 @@ public Variable findByVariableCode(Long teamId, String variableCode) {
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace placeholders with defined variable codes.
+     * @param teamId
+     * @param paramWithPlaceholders Parameters with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String replacePlaceholder(Long teamId, String paramWithPlaceholders) {

Review Comment:
   bad method name: `replacePlaceholder` , how about: `parseVariable` or `parse` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1286516503

   > hi @macksonmu: thanks for your hard work. have a little suggestion: Currently, placeholder-start char and end char is hard-coded , Can they be made configurable in application.yml? e.g:
   > 
   > ```
   > streampark:
   >   variable:
   >     placeholder-start: ${
   >     placeholder-end: }
   > ```
   good idea
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1285008862

   @1996fanrui @lvshaokang I've made the fix as suggested, thanks to both of you for your hard work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001744378


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,79 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");

Review Comment:
   > If `placeholderStart` and `placeholderEnd ` can be configured, the placeholderPattern should be dynamic.
   > 
   > And I'm afraid, there may be bugs here.
   > 
   > Hi @macksonmu @wolfboys , I don't recommend supporting configuration `placeholder.start` and `placeholder.end`. Because:
   > 
   > * If placeholderStart is `a`, and `placeholderEnd` is `b`, it’s very easy to conflict.
   > * It will also become harder if we want to automatically search for variables when we type $ in the future.
   
   
   I agree with you, placeholder.start and placeholder.end supporting configurable, It does cause unpredictable problems that are difficult to solve. 
    
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL #1781

Posted by GitBox <gi...@apache.org>.
macksonmu commented on PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#issuecomment-1286851704

   @wolfboys According to your suggestion, I have made the modification and verified it with the official production business.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1866: [Feature] Reference variables as placeholders in program args and Flink SQL

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1866:
URL: https://github.com/apache/incubator-streampark/pull/1866#discussion_r1001739705


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java:
##########
@@ -17,48 +17,79 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> implements VariableService {
 
+    private final Pattern placeholderPattern = Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");

Review Comment:
   oh, There is a bug in the definition of placeholderPattern:
   
   ```
       private Pattern placeholderPattern = null;
   
       @PostConstruct
       private void initPlaceholderPattern() {
           String regexpStart = placeholderStart.replace("", "\\")
               .replaceFirst("\\\\$", "");
           String regexpEnd = placeholderEnd.replace("", "\\")
               .replaceFirst("\\\\$", "");
           String regexpContent = "([A-Za-z])+([A-Za-z0-9._-])+";
           this.placeholderPattern = Pattern.compile(regexpStart + regexpContent + regexpEnd);
       }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org