You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/19 14:44:28 UTC

[GitHub] [incubator-seatunnel] hailin0 opened a new pull request, #3145: [Feature][st-engine] Support for transform row data stream using transform-v2 API

hailin0 opened a new pull request, #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   
   Release binary package:
   - Outpout transform-* jar to `transforms/` dir
   
   Plugin discovery:
   - Support for loading transform-* jar from `transform/*` dir
   
   Maven modules:
   - Add seatunnel-transform-v2 module
   - Add seatunnel-transform-v2-e2e module
   
   ST-Engine:
   - Support for Split、Replace、Filter transform
   
   E2E:
   - Support copy transform jar into container
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
TaoZex commented on code in PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#discussion_r1003386158


##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelTransform.class)
+public class CopyFieldTransform extends SingleFieldOutputTransform {
+
+    private static final String SRC_FIELD = "src_field";
+    private static final String DEST_FIELD = "dest_field";
+
+    private String srcField;
+    private int srcFieldIndex;
+    private SeaTunnelDataType srcFieldDataType;
+    private String destField;
+
+    @Override
+    public String getPluginName() {
+        return "Copy";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (!pluginConfig.hasPath(SRC_FIELD)) {
+            throw new IllegalArgumentException("The configuration missing key: " + SRC_FIELD);
+        }
+        if (!pluginConfig.hasPath(DEST_FIELD)) {
+            throw new IllegalArgumentException("The configuration missing key: " + DEST_FIELD);
+        }
+        this.srcField = pluginConfig.getString(SRC_FIELD);
+        this.destField = pluginConfig.getString(DEST_FIELD);
+    }
+
+    @Override
+    protected void setInputRowType(SeaTunnelRowType inputRowType) {
+        srcFieldIndex = inputRowType.indexOf(srcField);
+        srcFieldDataType = inputRowType.getFieldType(srcFieldIndex);
+    }
+
+    @Override
+    protected String getOutputFieldName() {
+        return destField;
+    }
+
+    @Override
+    protected SeaTunnelDataType getOutputFieldDataType() {
+        return srcFieldDataType;
+    }
+
+    @Override
+    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+        return inputRow.getField(srcFieldIndex);

Review Comment:
   Throw an exception in case srcField does not exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#discussion_r1003411850


##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelTransform.class)
+public class CopyFieldTransform extends SingleFieldOutputTransform {
+
+    private static final String SRC_FIELD = "src_field";
+    private static final String DEST_FIELD = "dest_field";
+
+    private String srcField;
+    private int srcFieldIndex;
+    private SeaTunnelDataType srcFieldDataType;
+    private String destField;
+
+    @Override
+    public String getPluginName() {
+        return "Copy";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (!pluginConfig.hasPath(SRC_FIELD)) {
+            throw new IllegalArgumentException("The configuration missing key: " + SRC_FIELD);
+        }
+        if (!pluginConfig.hasPath(DEST_FIELD)) {
+            throw new IllegalArgumentException("The configuration missing key: " + DEST_FIELD);
+        }
+        this.srcField = pluginConfig.getString(SRC_FIELD);
+        this.destField = pluginConfig.getString(DEST_FIELD);
+    }
+
+    @Override
+    protected void setInputRowType(SeaTunnelRowType inputRowType) {
+        srcFieldIndex = inputRowType.indexOf(srcField);
+        srcFieldDataType = inputRowType.getFieldType(srcFieldIndex);
+    }
+
+    @Override
+    protected String getOutputFieldName() {
+        return destField;
+    }
+
+    @Override
+    protected SeaTunnelDataType getOutputFieldDataType() {
+        return srcFieldDataType;
+    }
+
+    @Override
+    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+        return inputRow.getField(srcFieldIndex);

Review Comment:
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java#L85



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#issuecomment-1288306919

   @EricJoy2048 @ashulin @Hisoka-X @ic4y @CalvinKirs tx. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#discussion_r1010027935


##########
seatunnel-dist/pom.xml:
##########
@@ -103,6 +103,13 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <!-- transforms -->
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>seatunnel-transforms-v2</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>

Review Comment:
   It should also need to be added to the `release` profile.
   @CalvinKirs CC



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#issuecomment-1286699842

   <img width="302" alt="image" src="https://user-images.githubusercontent.com/14371345/197162922-1603422b-73a8-4d07-b157-10b7cd755876.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#issuecomment-1288695600

   @EricJoy2048 Are we should support repartition type transform in this version?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
TaoZex commented on code in PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#discussion_r1002882999


##########
seatunnel-dist/src/main/assembly/assembly-bin.xml:
##########
@@ -155,6 +155,19 @@
                 <include>org.apache.seatunnel:seatunnel-starter:jar</include>
             </includes>
             <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
+            <outputDirectory>/starter</outputDirectory>
+            <scope>provided</scope>
+        </dependencySet>
+
+        <!-- ============ SeaTunnel Transforms Jars ============  -->

Review Comment:
   How about use “SeaTunnel Transforms V2 Jars”?



##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelTransform.class)
+public class ReplaceTransform extends SingleFieldOutputTransform {
+
+    private static final String KEY_REPLACE_FIELD = "replace_field";
+    private static final String KEY_PATTERN = "pattern";
+    private static final String KEY_REPLACEMENT = "replacement";
+    private static final String KEY_IS_REGEX = "is_regex";
+    private static final String KEY_REPLACE_FIRST = "replace_first";
+
+    private int inputFieldIndex;
+    private String replaceField;
+    private String pattern;
+    private String replacement;
+    private boolean isRegex;
+    private boolean replaceFirst;
+
+    @Override
+    public String getPluginName() {
+        return "Replace";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (!pluginConfig.hasPath(KEY_REPLACE_FIELD)) {
+            throw new IllegalArgumentException("The configuration missing key: " + KEY_REPLACE_FIELD);
+        }
+        replaceField = pluginConfig.getString(KEY_REPLACE_FIELD);
+
+        if (!pluginConfig.hasPath(KEY_PATTERN)) {
+            throw new IllegalArgumentException("The configuration missing key: " + KEY_PATTERN);
+        }
+        pattern = pluginConfig.getString(KEY_PATTERN);
+
+        if (!pluginConfig.hasPath(KEY_REPLACEMENT)) {
+            throw new IllegalArgumentException("The configuration missing key: " + KEY_REPLACEMENT);
+        }
+        replacement = pluginConfig.getString(KEY_REPLACEMENT);
+
+        if (pluginConfig.hasPath(KEY_IS_REGEX)) {
+            isRegex = pluginConfig.getBoolean(KEY_IS_REGEX);
+        }
+        if (pluginConfig.hasPath(KEY_REPLACE_FIRST)) {
+            replaceFirst = pluginConfig.getBoolean(KEY_REPLACE_FIRST);
+        }
+    }
+
+    @Override
+    protected void setInputRowType(SeaTunnelRowType rowType) {
+        inputFieldIndex = rowType.indexOf(replaceField);
+    }
+
+    @Override
+    protected String getOutputFieldName() {
+        return replaceField;
+    }
+
+    @Override
+    protected SeaTunnelDataType getOutputFieldDataType() {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+        Object inputFieldValue = inputRow.getField(inputFieldIndex);
+        if (inputFieldValue == null) {
+            return null;
+        }
+
+        if (isRegex) {
+            if (replaceFirst) {
+                return inputFieldValue.toString().replaceFirst(pattern, replacement);
+            }
+            return inputFieldValue.toString().replaceAll(pattern, replacement);
+        }
+        return inputFieldValue.toString().replace(pattern, replacement);

Review Comment:
   If there are multiple field names containing pattern, do we need to implement a method to replace the first pattern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#issuecomment-1288307856

   @TaoZex @TyrantLucifer @liugddx   th.  PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
TaoZex commented on code in PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#discussion_r1003357469


##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelTransform.class)
+public class ReplaceTransform extends SingleFieldOutputTransform {
+
+    private static final String KEY_REPLACE_FIELD = "replace_field";
+    private static final String KEY_PATTERN = "pattern";
+    private static final String KEY_REPLACEMENT = "replacement";
+    private static final String KEY_IS_REGEX = "is_regex";
+    private static final String KEY_REPLACE_FIRST = "replace_first";
+
+    private int inputFieldIndex;
+    private String replaceField;
+    private String pattern;
+    private String replacement;
+    private boolean isRegex;
+    private boolean replaceFirst;
+
+    @Override
+    public String getPluginName() {
+        return "Replace";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (!pluginConfig.hasPath(KEY_REPLACE_FIELD)) {
+            throw new IllegalArgumentException("The configuration missing key: " + KEY_REPLACE_FIELD);
+        }
+        replaceField = pluginConfig.getString(KEY_REPLACE_FIELD);
+
+        if (!pluginConfig.hasPath(KEY_PATTERN)) {
+            throw new IllegalArgumentException("The configuration missing key: " + KEY_PATTERN);
+        }
+        pattern = pluginConfig.getString(KEY_PATTERN);
+
+        if (!pluginConfig.hasPath(KEY_REPLACEMENT)) {
+            throw new IllegalArgumentException("The configuration missing key: " + KEY_REPLACEMENT);
+        }
+        replacement = pluginConfig.getString(KEY_REPLACEMENT);
+
+        if (pluginConfig.hasPath(KEY_IS_REGEX)) {
+            isRegex = pluginConfig.getBoolean(KEY_IS_REGEX);
+        }
+        if (pluginConfig.hasPath(KEY_REPLACE_FIRST)) {
+            replaceFirst = pluginConfig.getBoolean(KEY_REPLACE_FIRST);
+        }
+    }
+
+    @Override
+    protected void setInputRowType(SeaTunnelRowType rowType) {
+        inputFieldIndex = rowType.indexOf(replaceField);
+    }
+
+    @Override
+    protected String getOutputFieldName() {
+        return replaceField;
+    }
+
+    @Override
+    protected SeaTunnelDataType getOutputFieldDataType() {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+        Object inputFieldValue = inputRow.getField(inputFieldIndex);
+        if (inputFieldValue == null) {
+            return null;
+        }
+
+        if (isRegex) {
+            if (replaceFirst) {
+                return inputFieldValue.toString().replaceFirst(pattern, replacement);
+            }
+            return inputFieldValue.toString().replaceAll(pattern, replacement);
+        }
+        return inputFieldValue.toString().replace(pattern, replacement);

Review Comment:
   If there are multiple field names containing pattern, do we need to implement a method to replace the first pattern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X merged pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
Hisoka-X merged PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#discussion_r1010036486


##########
seatunnel-dist/pom.xml:
##########
@@ -103,6 +103,13 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <!-- transforms -->
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>seatunnel-transforms-v2</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>

Review Comment:
   fixed. added to `release` profile
   @ashulin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#discussion_r1002850643


##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java:
##########
@@ -52,7 +52,16 @@ public final class ContainerUtil {
     /**
      * An error occurs when the user is not a submodule of seatunnel-e2e.
      */
-    public static final String PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent().toString();
+    public static final String PROJECT_ROOT_PATH = getProjectRootPath();
+
+    private static String getProjectRootPath() {
+        String e2eRootModuleDir = "seatunnel-e2e";
+        Path path = Paths.get(System.getProperty("user.dir"));
+        while (!path.endsWith(Paths.get(e2eRootModuleDir))) {
+            path = path.getParent();
+        }
+        return path.getParent().toString();
+    }

Review Comment:
   Update get root path in e2e module



##########
seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainersFactory;
+import org.apache.seatunnel.e2e.common.junit.ContainerTestingExtension;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestCaseInvocationContextProvider;
+import org.apache.seatunnel.e2e.common.junit.TestContainers;
+import org.apache.seatunnel.e2e.common.junit.TestLoggerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.Network;
+
+@ExtendWith({
+    ContainerTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@DisabledOnContainer(value = {}, type = {EngineType.FLINK, EngineType.SPARK}, disabledReason = "TODO: Transform v2 translation to spark/flink isn't completed")

Review Comment:
   Now disable FLINK、SPARK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#issuecomment-1296831505

   > @EricJoy2048 Are we should support repartition type transform in this version?
   
   I suggest to add the support of repartition transform in the subsequent plans.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3145: [Feature][st-engine] Support for transform-v2 API

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #3145:
URL: https://github.com/apache/incubator-seatunnel/pull/3145#discussion_r1009153778


##########
seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java:
##########
@@ -129,6 +130,30 @@ public static Path connectorDir() {
         }
     }
 
+    /**
+     * lib Dir
+     */
+    public static Path libDir() {
+        String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
+        if (StringUtils.isBlank(seatunnelHome)) {
+            seatunnelHome = appRootDir().toString();
+        }
+        return Paths.get(seatunnelHome, "lib");
+    }
+
+    /**
+     * return lib jars, which located in 'lib/*'.
+     */
+    public static List<Path> getLibJars() {
+        Path libRootDir = Common.libDir();
+        if (!Files.exists(libRootDir) || !Files.isDirectory(libRootDir)) {
+            return Collections.emptyList();
+        }
+        return Arrays.stream(libRootDir.toFile().listFiles((dir, name) -> name.endsWith(".jar")))

Review Comment:
   The method seem like not support `lib/other/test.jar`? Maybe use `Files.walk` could be better.



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -109,10 +108,22 @@ private void registerPlugin() {
                 }
             })
             .collect(Collectors.toList());
-
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
+        List<URL> libJarDependencies = Common.getLibJars().stream()

Review Comment:
   Please reduce same code.



##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/CopyFieldTransform.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelTransform.class)
+public class CopyFieldTransform extends SingleFieldOutputTransform {
+
+    private static final String SRC_FIELD = "src_field";
+    private static final String DEST_FIELD = "dest_field";
+
+    private String srcField;
+    private int srcFieldIndex;
+    private SeaTunnelDataType srcFieldDataType;
+    private String destField;
+
+    @Override
+    public String getPluginName() {
+        return "Copy";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (!pluginConfig.hasPath(SRC_FIELD)) {
+            throw new IllegalArgumentException("The configuration missing key: " + SRC_FIELD);
+        }
+        if (!pluginConfig.hasPath(DEST_FIELD)) {
+            throw new IllegalArgumentException("The configuration missing key: " + DEST_FIELD);
+        }
+        this.srcField = pluginConfig.getString(SRC_FIELD);
+        this.destField = pluginConfig.getString(DEST_FIELD);
+    }
+
+    @Override
+    protected void setInputRowType(SeaTunnelRowType inputRowType) {
+        srcFieldIndex = inputRowType.indexOf(srcField);
+        if (srcFieldIndex == -1) {
+            throw new IllegalArgumentException("Cannot find [" + srcField + "] field in input row type");
+        }
+        srcFieldDataType = inputRowType.getFieldType(srcFieldIndex);
+    }
+
+    @Override
+    protected String getOutputFieldName() {
+        return destField;
+    }
+
+    @Override
+    protected SeaTunnelDataType getOutputFieldDataType() {
+        return srcFieldDataType;
+    }
+
+    @Override
+    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+        return inputRow.getField(srcFieldIndex);

Review Comment:
   Use deep copy?



##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/ReplaceTransform.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelTransform.class)
+public class ReplaceTransform extends SingleFieldOutputTransform {
+
+    private static final String KEY_REPLACE_FIELD = "replace_field";
+    private static final String KEY_PATTERN = "pattern";
+    private static final String KEY_REPLACEMENT = "replacement";
+    private static final String KEY_IS_REGEX = "is_regex";
+    private static final String KEY_REPLACE_FIRST = "replace_first";
+
+    private int inputFieldIndex;
+    private String replaceField;
+    private String pattern;
+    private String replacement;
+    private boolean isRegex;
+    private boolean replaceFirst;
+
+    @Override
+    public String getPluginName() {
+        return "Replace";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (!pluginConfig.hasPath(KEY_REPLACE_FIELD)) {

Review Comment:
   Just use `CheckConfigUtil.checkAllExists`



##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/SplitTransform.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+
+@AutoService(SeaTunnelTransform.class)
+public class SplitTransform extends MultipleFieldOutputTransform {
+
+    private static final String KEY_SEPARATOR = "separator";
+    private static final String KEY_SPLIT_FIELD = "split_field";
+    private static final String KEY_OUTPUT_FIELDS = "output_fields";
+
+    private String separator;
+    private String splitField;
+    private int splitFieldIndex;
+    private String[] outputFields;
+    private String[] emptySplits;
+
+    @Override
+    public String getPluginName() {
+        return "Split";
+    }
+
+    @Override
+    protected void setConfig(Config pluginConfig) {
+        if (!pluginConfig.hasPath(KEY_SEPARATOR)) {

Review Comment:
   Just use `CheckConfigUtil.checkAllExists`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org