You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/05/06 01:29:03 UTC

[incubator-seatunnel] branch dev updated: [Bug] [Spark] SeaTunnel on spark standalone cluster “plugins.tar.gz” decompress error. #1733 (#1757)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8d02ab4b [Bug] [Spark] SeaTunnel on spark standalone cluster “plugins.tar.gz” decompress error. #1733 (#1757)
8d02ab4b is described below

commit 8d02ab4b00868b83d4f526b7f18c1566f7fe46d4
Author: quanzhian <57...@qq.com>
AuthorDate: Fri May 6 09:28:57 2022 +0800

    [Bug] [Spark] SeaTunnel on spark standalone cluster “plugins.tar.gz” decompress error. #1733 (#1757)
    
    * [Bug] [Spark] Can't run SeaTunnel on spark standalone cluster. #1733
    Fix the problem of "plugins.tar.gz" decompression failure in spark cluster mode
    
    * [Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job#1753
    
    * Revert "[Feature][seatunnal-flink-sql] Support dynamic config for Flink SQL mode job#1753"
    
    This reverts commit 42572be6
    
    * [bug][seatunnel-core-flink-sql]Fix the compatibility problem of the flick version and use "stenv. Getparser(). Parse (stmt)" to verify "set 'table. DML sync' = 'true';" Statement error reporting
    
    Co-authored-by: 全志安 <zh...@quadtalent.com>
---
 .../core/base/utils/CompressionUtils.java          |  9 ++--
 .../apache/seatunnel/core/sql/job/Executor.java    | 55 +++++++++++++++++-----
 2 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
index f3c2d45e..312ea842 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
@@ -106,10 +106,7 @@ public final class CompressionUtils {
              final TarArchiveInputStream debInputStream = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is)) {
             TarArchiveEntry entry = null;
             while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
-                final File outputFile = new File(outputDir, entry.getName());
-                if (!outputFile.toPath().normalize().startsWith(outputDir.toPath())) {
-                    throw new IllegalStateException("Bad zip entry");
-                }
+                final File outputFile = new File(outputDir, entry.getName()).toPath().normalize().toFile();
                 if (entry.isDirectory()) {
                     LOGGER.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
                     if (!outputFile.exists()) {
@@ -120,6 +117,10 @@ public final class CompressionUtils {
                     }
                 } else {
                     LOGGER.info("Creating output file {}.", outputFile.getAbsolutePath());
+                    File outputParentFile = outputFile.getParentFile();
+                    if (outputParentFile != null && !outputParentFile.exists()) {
+                        outputParentFile.mkdirs();
+                    }
                     final OutputStream outputFileStream = new FileOutputStream(outputFile);
                     IOUtils.copy(debInputStream, outputFileStream);
                     outputFileStream.close();
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
index c716d1e8..cdcab420 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.sql.job;
 
 import org.apache.seatunnel.core.sql.splitter.SqlStatementSplitter;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -27,12 +28,17 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.command.SetOperation;
 
 import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class Executor {
 
+    private static final String FLINK_SQL_SET_MATCHING_REGEX = "SET(\\s+(\\S+)\\s*=(.*))?";
+    private static final int FLINK_SQL_SET_OPERANDS = 3;
+
     private Executor() {
         throw new IllegalStateException("Utility class");
     }
@@ -57,11 +63,15 @@ public class Executor {
 
         List<String> stmts = SqlStatementSplitter.normalizeStatements(workFlowContent);
         for (String stmt : stmts) {
+            Optional<String[]> optional = setOperationParse(stmt);
+            if (optional.isPresent()) {
+                String[] setOptionStrs = optional.get();
+                callSetOperation(configuration, setOptionStrs[0], setOptionStrs[1]);
+                continue;
+            }
             Operation op = stEnv.getParser().parse(stmt).get(0);
             if (op instanceof CatalogSinkModifyOperation) {
                 statementSet.addInsertSql(stmt);
-            } else if (op instanceof SetOperation) {
-                callSetOperation(configuration, (SetOperation) op);
             } else {
                 tEnv.executeSql(stmt);
             }
@@ -69,17 +79,38 @@ public class Executor {
         return statementSet;
     }
 
-    private static void callSetOperation(Configuration configuration, SetOperation setOperation) {
-
-        // set property
-        String key = setOperation.getKey()
-            .orElseThrow(() -> new IllegalArgumentException("key can not be empty!"))
-            .trim();
+    private static Optional<String[]> setOperationParse(String stmt) {
+        stmt = stmt.trim();
+        Pattern pattern = Pattern.compile(FLINK_SQL_SET_MATCHING_REGEX, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
+        final Matcher matcher = pattern.matcher(stmt);
+        if (matcher.matches()) {
+            final String[] groups = new String[matcher.groupCount()];
+            for (int i = 0; i < groups.length; i++) {
+                groups[i] = matcher.group(i + 1);
+            }
+            return operandConverter(groups);
+        }
+        return Optional.empty();
+    }
 
-        String value = setOperation.getValue()
-            .orElseThrow(() -> new IllegalArgumentException("value can not be empty!"))
-            .trim();
+    private static Optional<String[]> operandConverter(String[] operands){
+        if (operands.length >= FLINK_SQL_SET_OPERANDS) {
+            if (operands[0] == null) {
+                return Optional.of(new String[0]);
+            }
+        } else {
+            return Optional.empty();
+        }
+        return Optional.of(new String[]{operands[1], operands[2]});
+    }
 
+    private static void callSetOperation(Configuration configuration, String key, String value) {
+        if (StringUtils.isEmpty(key)) {
+            new IllegalArgumentException("key can not be empty!");
+        }
+        if (StringUtils.isEmpty(value)) {
+            new IllegalArgumentException("value can not be empty!");
+        }
         configuration.setString(key, value);
     }