You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "lianghuan-xatu (via GitHub)" <gi...@apache.org> on 2023/09/22 08:27:32 UTC

[GitHub] [seatunnel] lianghuan-xatu opened a new pull request, #5542: [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic

lianghuan-xatu opened a new pull request, #5542:
URL: https://github.com/apache/seatunnel/pull/5542

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ### Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   https://github.com/apache/seatunnel/issues/5012
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released SeaTunnel versions or within the unreleased branches such as dev.
   If no, write 'No'.
   If you are adding/modifying connector documents, please follow our new specifications: https://github.com/apache/seatunnel/issues/4544.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If you are adding E2E test cases, maybe refer to https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf, here is a good example.
   -->
   
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1366470369


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,92 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            /**
+             * TODO: Before uploading the Jar package file the server, first determine whether the
+             * server holds the current Jar. If the server holds the same Jar package file, there is
+             * no need for additional uploads.
+             */
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
+            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+            transformActionPluginJarUrls(actions, pluginJarIdentifiers);
+            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+            connectorJarIdentifiers.addAll(commonJarIdentifiers);
+            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+            jarUrls.addAll(commonPluginJarUrls);
+            jarUrls.addAll(connectorPluginJarUrls);
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, commonPluginJarUrls, commonJarIdentifiers);
+                    });
+            actions.forEach(
+                    action -> {
+                        org.apache.seatunnel.engine.core.dag.actions.Config config =
+                                action.getConfig();
+                    });
+        } else {
+            jarUrls.addAll(commonPluginJars);
+            jarUrls.addAll(immutablePair.getRight());
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                    });
+        }
+        return getLogicalDagGenerator().generate();
+    }
+
+    protected Set<ConnectorJarIdentifier> uploadPluginJarUrls(Set<URL> pluginJarUrls) {

Review Comment:
   I have made the changes, PTAL!



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,92 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            /**
+             * TODO: Before uploading the Jar package file the server, first determine whether the
+             * server holds the current Jar. If the server holds the same Jar package file, there is
+             * no need for additional uploads.
+             */
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
+            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+            transformActionPluginJarUrls(actions, pluginJarIdentifiers);
+            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+            connectorJarIdentifiers.addAll(commonJarIdentifiers);
+            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+            jarUrls.addAll(commonPluginJarUrls);
+            jarUrls.addAll(connectorPluginJarUrls);
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, commonPluginJarUrls, commonJarIdentifiers);
+                    });
+            actions.forEach(
+                    action -> {
+                        org.apache.seatunnel.engine.core.dag.actions.Config config =
+                                action.getConfig();
+                    });
+        } else {
+            jarUrls.addAll(commonPluginJars);
+            jarUrls.addAll(immutablePair.getRight());
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                    });
+        }
+        return getLogicalDagGenerator().generate();
+    }
+
+    protected Set<ConnectorJarIdentifier> uploadPluginJarUrls(Set<URL> pluginJarUrls) {
+        Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+        pluginJarUrls.forEach(
+                pluginJarUrl -> {
+                    /**
+                     * TODO: Before uploading the Jar package file the server, first determine
+                     * whether the server holds the current Jar. If the server holds the same Jar
+                     * package file, there is no need for additional uploads.
+                     */
+                    ConnectorJarIdentifier connectorJarIdentifier =
+                            connectorPackageClient.uploadConnectorPluginJar(
+                                    Long.parseLong(jobConfig.getJobContext().getJobId()),
+                                    pluginJarUrl);
+                    pluginJarIdentifiers.add(connectorJarIdentifier);
+                });
+        return pluginJarIdentifiers;
+    }
+
+    private void transformActionPluginJarUrls(

Review Comment:
   I have made the changes, PTAL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1365248612


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   This feature can be done on server side. When client upload jar to server, server can do check for jar already existed or not. If existed, just skip save jar into server, finally return successed to client. So you don't need to send another check request, all be done in one request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Zeta][Feature] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1374711613


##########
docs/en/seatunnel-engine/optimize-job-submission.md:
##########
@@ -0,0 +1,101 @@
+---

Review Comment:
   Done, PTAL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1365388322


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java:
##########
@@ -62,6 +71,20 @@ public Long getJobId() {
         return jobId;
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        jarUrls.addAll(commonPluginJars);
+        jarUrls.addAll(immutablePair.getRight());
+        actions.forEach(
+                action -> {
+                    addCommonPluginJarsToAction(
+                            action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                });
+        return getLogicalDagGenerator().generate();
+    }

Review Comment:
   In the previous parsing logic, we need to pass the local CommonPluginJar through method parameters to the MultiTableJobParser, in order to set the URLs of all current CommonPluginJars in the construction Action. But what we need to set is the path to the server that will be returned after uploading to the server. If the URLs of commonPlugin are set to Action in advance, it will cause the CommonPluginJarUrl in the Action to be indistinguishable from the JarUrl of the connector. we must upload CommonPluginJar and ConnectorPluginJar separately. Because the MultiTableJobParser is in the Core module, the upload process cannot be done simultaneously when building Actions. Instead, the logical relationship diagrams of all Actions can be constructed and the CommonPluginJar and ConnectorPluginJar can be uploaded recursively. So we need to first recurse each Action and upload the ConnectorPlugin, then set the storage path of the ConnectorPluginJar in the server to the Action. After all the 
 ConnectorPluginJar paths in all upstream and downstream Action chains are replaced, add the path of CommonPluginJar on the server to each Action.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1366357250


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ConnectorPackageClient {
+
+    private static final ILogger LOGGER = Logger.getLogger(ConnectorPackageClient.class);
+
+    private final SeaTunnelHazelcastClient hazelcastClient;
+
+    public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
+        checkNotNull(hazelcastClient);
+        this.hazelcastClient = hazelcastClient;
+    }
+
+    public Set<ConnectorJarIdentifier> uploadCommonPluginJars(
+            long jobId, List<URL> commonPluginJars) {
+        Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
+        // Upload commonPluginJar
+        for (URL commonPluginJar : commonPluginJars) {
+            // handle the local file path
+            // origin path : /${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
+            // handled path : ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+            Path path = Paths.get(commonPluginJar.getPath().substring(1));
+            // Obtain the directory name of the relative location of the file path.
+            // for example, The path is
+            // ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar, so the name
+            // obtained here is the connector plugin name : JDBC

Review Comment:
   > May I ask if we can keep the function of uploading common jars to the Zeta engine?
   
   Yes, keep it. But marked as not recommend.
   
   > When common jars do not exist in Zeta's lib, it can upload the local common jars of the client to the lib directory of all engine nodes.
   
   Do not touch `lib` directory. This may cause exceptions after the server restarts. If user put jar in `plugin/`, just upload to `plugin/` in server.
   



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ConnectorPackageClient {
+
+    private static final ILogger LOGGER = Logger.getLogger(ConnectorPackageClient.class);
+
+    private final SeaTunnelHazelcastClient hazelcastClient;
+
+    public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
+        checkNotNull(hazelcastClient);
+        this.hazelcastClient = hazelcastClient;
+    }
+
+    public Set<ConnectorJarIdentifier> uploadCommonPluginJars(
+            long jobId, List<URL> commonPluginJars) {
+        Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
+        // Upload commonPluginJar
+        for (URL commonPluginJar : commonPluginJars) {
+            // handle the local file path
+            // origin path : /${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
+            // handled path : ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+            Path path = Paths.get(commonPluginJar.getPath().substring(1));
+            // Obtain the directory name of the relative location of the file path.
+            // for example, The path is
+            // ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar, so the name
+            // obtained here is the connector plugin name : JDBC

Review Comment:
   > May I ask if we can keep the function of uploading common jars to the Zeta engine?
   
   Yes, keep it. But marked as not recommend.
   
   > When common jars do not exist in Zeta's lib, it can upload the local common jars of the client to the lib directory of all engine nodes.
   
   Do not touch `lib` directory. This may cause exceptions after the server restarts. If user put jar in `plugin/`, just upload to `plugin/` in server.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "ruanwenjun (via GitHub)" <gi...@apache.org>.
ruanwenjun commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1362072901


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   Do we need to check if the jar already exist in Master?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1370773603


##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java:
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
+import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.internal.serialization.Data;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.engine.core.job.AbstractJobEnvironment.getJarUrlsFromIdentifiers;
+import static org.awaitility.Awaitility.await;
+
+public class ConnectorPackageServiceTest extends AbstractSeaTunnelServerTest {
+
+    @Test
+    public void testMasterNodeActive() {
+        HazelcastInstanceImpl instance1 =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        TestUtils.getClusterName(
+                                "ConnectorPackageServiceTest_testMasterNodeActive"));
+        HazelcastInstanceImpl instance2 =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        TestUtils.getClusterName(
+                                "ConnectorPackageServiceTest_testMasterNodeActive"));
+
+        SeaTunnelServer server1 =
+                instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+        SeaTunnelServer server2 =
+                instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+
+        Assertions.assertTrue(server1.isMasterNode());
+        ConnectorPackageService connectorPackageService1 = server1.getConnectorPackageService();
+        Assertions.assertTrue(connectorPackageService1.isConnectorPackageServiceActive());
+

Review Comment:
   Thank you very much for raising this problem. The master node will only create the connector package service when the configuration of the connector package service is enabled. I have fixed the related issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Zeta][Feature] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1374712236


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java:
##########
@@ -64,6 +83,81 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.

Review Comment:
   I have fixed it, PTAL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1366353639


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java:
##########
@@ -62,6 +71,20 @@ public Long getJobId() {
         return jobId;
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        jarUrls.addAll(commonPluginJars);
+        jarUrls.addAll(immutablePair.getRight());
+        actions.forEach(
+                action -> {
+                    addCommonPluginJarsToAction(
+                            action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                });
+        return getLogicalDagGenerator().generate();
+    }

Review Comment:
   I get you point.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java:
##########
@@ -62,6 +71,20 @@ public Long getJobId() {
         return jobId;
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        jarUrls.addAll(commonPluginJars);
+        jarUrls.addAll(immutablePair.getRight());
+        actions.forEach(
+                action -> {
+                    addCommonPluginJarsToAction(
+                            action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                });
+        return getLogicalDagGenerator().generate();
+    }

Review Comment:
   I get your point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1365252490


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,92 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            /**
+             * TODO: Before uploading the Jar package file the server, first determine whether the
+             * server holds the current Jar. If the server holds the same Jar package file, there is
+             * no need for additional uploads.
+             */
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
+            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+            transformActionPluginJarUrls(actions, pluginJarIdentifiers);
+            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+            connectorJarIdentifiers.addAll(commonJarIdentifiers);
+            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+            jarUrls.addAll(commonPluginJarUrls);
+            jarUrls.addAll(connectorPluginJarUrls);
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, commonPluginJarUrls, commonJarIdentifiers);
+                    });
+            actions.forEach(
+                    action -> {
+                        org.apache.seatunnel.engine.core.dag.actions.Config config =
+                                action.getConfig();
+                    });
+        } else {
+            jarUrls.addAll(commonPluginJars);
+            jarUrls.addAll(immutablePair.getRight());
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                    });

Review Comment:
   ditto



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java:
##########
@@ -62,6 +71,20 @@ public Long getJobId() {
         return jobId;
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        jarUrls.addAll(commonPluginJars);
+        jarUrls.addAll(immutablePair.getRight());
+        actions.forEach(
+                action -> {
+                    addCommonPluginJarsToAction(
+                            action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                });
+        return getLogicalDagGenerator().generate();
+    }

Review Comment:
   Compare with https://github.com/apache/seatunnel/pull/5542/files#diff-466c4277b9a67418eb669f75714d0b035ef17ad838977c5556dc58b3aa38e030L108-L113, seem like you add commPluginJars for all actions and jarUrls. Why do this change? Is there any problem before this PR?



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   This feature can be done in server side. When client upload jar to server, server can do check for jar already existed or not. If existed, just skip save jar into server, finally return successed to client. So you don't need to send another check request, all be done in one request.



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,92 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {

Review Comment:
   ```suggestion
           if (enableUploadConnectorJarPackage) {
   ```



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ConnectorPackageClient {
+
+    private static final ILogger LOGGER = Logger.getLogger(ConnectorPackageClient.class);
+
+    private final SeaTunnelHazelcastClient hazelcastClient;
+
+    public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
+        checkNotNull(hazelcastClient);
+        this.hazelcastClient = hazelcastClient;
+    }
+
+    public Set<ConnectorJarIdentifier> uploadCommonPluginJars(
+            long jobId, List<URL> commonPluginJars) {
+        Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
+        // Upload commonPluginJar
+        for (URL commonPluginJar : commonPluginJars) {
+            // handle the local file path
+            // origin path : /${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
+            // handled path : ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+            Path path = Paths.get(commonPluginJar.getPath().substring(1));
+            // Obtain the directory name of the relative location of the file path.
+            // for example, The path is
+            // ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar, so the name
+            // obtained here is the connector plugin name : JDBC

Review Comment:
   The logic not right. In fact, we don't must use plugin name like `Jdbc` to create directory. It could be any name, like `today`, `mysql` etc. Directory name doesn't have any relation with plugin name. The jar in `plugin/` should used for all connector. (PS: in Zeta engine, we mark `plugin` directory as deprecated. Refer: https://github.com/apache/seatunnel/blob/dev/plugins/README.md, user should put jar into `lib` on all node. I think in this situation, you should not care common jar). But we supported define common jar in job env. Refer https://seatunnel.apache.org/docs/2.3.3/concept/JobEnvConfig#jars



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,92 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            /**
+             * TODO: Before uploading the Jar package file the server, first determine whether the
+             * server holds the current Jar. If the server holds the same Jar package file, there is
+             * no need for additional uploads.
+             */
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
+            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+            transformActionPluginJarUrls(actions, pluginJarIdentifiers);
+            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+            connectorJarIdentifiers.addAll(commonJarIdentifiers);
+            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+            jarUrls.addAll(commonPluginJarUrls);
+            jarUrls.addAll(connectorPluginJarUrls);
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, commonPluginJarUrls, commonJarIdentifiers);
+                    });
+            actions.forEach(
+                    action -> {
+                        org.apache.seatunnel.engine.core.dag.actions.Config config =
+                                action.getConfig();
+                    });
+        } else {
+            jarUrls.addAll(commonPluginJars);
+            jarUrls.addAll(immutablePair.getRight());
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                    });
+        }
+        return getLogicalDagGenerator().generate();
+    }
+
+    protected Set<ConnectorJarIdentifier> uploadPluginJarUrls(Set<URL> pluginJarUrls) {
+        Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+        pluginJarUrls.forEach(
+                pluginJarUrl -> {
+                    /**
+                     * TODO: Before uploading the Jar package file the server, first determine
+                     * whether the server holds the current Jar. If the server holds the same Jar
+                     * package file, there is no need for additional uploads.
+                     */
+                    ConnectorJarIdentifier connectorJarIdentifier =
+                            connectorPackageClient.uploadConnectorPluginJar(
+                                    Long.parseLong(jobConfig.getJobContext().getJobId()),
+                                    pluginJarUrl);
+                    pluginJarIdentifiers.add(connectorJarIdentifier);
+                });
+        return pluginJarIdentifiers;
+    }
+
+    private void transformActionPluginJarUrls(

Review Comment:
   ```suggestion
       private void uploadActionPluginJar(
   ```



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,92 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            /**
+             * TODO: Before uploading the Jar package file the server, first determine whether the
+             * server holds the current Jar. If the server holds the same Jar package file, there is
+             * no need for additional uploads.
+             */
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
+            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+            transformActionPluginJarUrls(actions, pluginJarIdentifiers);
+            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+            connectorJarIdentifiers.addAll(commonJarIdentifiers);
+            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+            jarUrls.addAll(commonPluginJarUrls);
+            jarUrls.addAll(connectorPluginJarUrls);
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, commonPluginJarUrls, commonJarIdentifiers);
+                    });
+            actions.forEach(
+                    action -> {
+                        org.apache.seatunnel.engine.core.dag.actions.Config config =
+                                action.getConfig();
+                    });
+        } else {
+            jarUrls.addAll(commonPluginJars);
+            jarUrls.addAll(immutablePair.getRight());
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                    });
+        }
+        return getLogicalDagGenerator().generate();
+    }
+
+    protected Set<ConnectorJarIdentifier> uploadPluginJarUrls(Set<URL> pluginJarUrls) {

Review Comment:
   You are doing upload jar, not upload jar url
   ```suggestion
       protected Set<ConnectorJarIdentifier> uploadPluginJar(Set<URL> pluginJarUrls) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1366581778


##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ConnectorPackageServiceIT.java:
##########
@@ -0,0 +1,148 @@
+/*

Review Comment:
   I find all test case running on same JVM instance. It can't prove the upload logic work fine. Please add some e2e test case with 1 client and 2 server, all runnning on different docker containers. You can refer `org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Zeta][Feature] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer merged PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "ruanwenjun (via GitHub)" <gi...@apache.org>.
ruanwenjun commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1364827748


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   You can add todo here, this is not needed, but this can help to reduce the request size, since most of the Job may have the same connector jars.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1364870101


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   Thank you very much for your suggestion. I have added todo, PTAL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1370768256


##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ConnectorPackageServiceIT.java:
##########
@@ -0,0 +1,148 @@
+/*

Review Comment:
   I have added an e2e test created a Seatunnel Zeta Cluster through three Docker instances, and conducted relevant testing.PTAL, thx!



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ConnectorPackageServiceIT.java:
##########
@@ -0,0 +1,148 @@
+/*

Review Comment:
   I have added an e2e test created a Seatunnel Zeta Cluster through three Docker instances, and conducted relevant testing.PTAL, thx!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#issuecomment-1777048926

   @TyrantLucifer I think you should take a look too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1372636746


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java:
##########
@@ -40,6 +41,8 @@ public class TaskGroupImmutableInformation implements IdentifiedDataSerializable
 
     private Set<URL> jars;
 
+    private Set<ConnectorJarIdentifier> connectorJarIdentifiers;

Review Comment:
   I think we should add some comment in code to tell other developers whats difference between two jars collection. And what's reason why we should use two jars collection. PS: I think we should just use `ConnectorJarIdentifier` for all jars collection would be better. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1372799268


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java:
##########
@@ -64,6 +83,81 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
+            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+            uploadActionPluginJar(actions, pluginJarIdentifiers);
+            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+            connectorJarIdentifiers.addAll(commonJarIdentifiers);
+            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+            jarUrls.addAll(commonPluginJarUrls);
+            jarUrls.addAll(connectorPluginJarUrls);
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, commonPluginJarUrls, commonJarIdentifiers);
+                    });
+            actions.forEach(
+                    action -> {
+                        org.apache.seatunnel.engine.core.dag.actions.Config config =
+                                action.getConfig();
+                    });

Review Comment:
   This is useless code left over from previous versions of seatunnel and can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1369999130


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -253,4 +257,63 @@ private void printExecutionInfo() {
             coordinatorService.printJobDetailInfo();
         }
     }
+
+    public SeaTunnelConfig getSeaTunnelConfig() {
+        return seaTunnelConfig;
+    }
+
+    public NodeEngineImpl getNodeEngine() {
+        return nodeEngine;
+    }
+
+    public ConnectorPackageService getConnectorPackageService() {
+        int retryCount = 0;
+        if (isMasterNode()) {
+            // The hazelcast operator request invocation will retry, We must wait enough time to
+            // wait the invocation return.
+            String hazelcastInvocationMaxRetry =
+                    seaTunnelConfig
+                            .getHazelcastConfig()
+                            .getProperty("hazelcast.invocation.max.retry.count");

Review Comment:
   Please use reference not hard code config. You can refer https://github.com/apache/seatunnel/pull/5618



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -253,4 +257,63 @@ private void printExecutionInfo() {
             coordinatorService.printJobDetailInfo();
         }
     }
+
+    public SeaTunnelConfig getSeaTunnelConfig() {
+        return seaTunnelConfig;
+    }
+
+    public NodeEngineImpl getNodeEngine() {
+        return nodeEngine;
+    }
+
+    public ConnectorPackageService getConnectorPackageService() {

Review Comment:
   Why not put it into `CoordinatorService` since connectorPackageService only work on master node.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -253,4 +257,63 @@ private void printExecutionInfo() {
             coordinatorService.printJobDetailInfo();
         }
     }
+
+    public SeaTunnelConfig getSeaTunnelConfig() {
+        return seaTunnelConfig;
+    }
+
+    public NodeEngineImpl getNodeEngine() {
+        return nodeEngine;
+    }
+
+    public ConnectorPackageService getConnectorPackageService() {
+        int retryCount = 0;
+        if (isMasterNode()) {
+            // The hazelcast operator request invocation will retry, We must wait enough time to
+            // wait the invocation return.
+            String hazelcastInvocationMaxRetry =
+                    seaTunnelConfig
+                            .getHazelcastConfig()
+                            .getProperty("hazelcast.invocation.max.retry.count");
+            int maxRetry =
+                    hazelcastInvocationMaxRetry == null
+                            ? 250 * 2
+                            : Integer.parseInt(hazelcastInvocationMaxRetry) * 2;
+
+            String hazelcastRetryPause =
+                    seaTunnelConfig
+                            .getHazelcastConfig()
+                            .getProperty("hazelcast.invocation.retry.pause.millis");

Review Comment:
   ditto



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ConnectorPackageClient {
+
+    private static final ILogger LOGGER = Logger.getLogger(ConnectorPackageClient.class);
+
+    private final SeaTunnelHazelcastClient hazelcastClient;
+
+    public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
+        checkNotNull(hazelcastClient);
+        this.hazelcastClient = hazelcastClient;
+    }
+
+    public Set<ConnectorJarIdentifier> uploadCommonPluginJars(
+            long jobId, List<URL> commonPluginJars) {
+        Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
+        // Upload commonPluginJar
+        for (URL commonPluginJar : commonPluginJars) {
+            // handle the local file path
+            // origin path : /${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
+            // handled path : ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+            Path path = Paths.get(commonPluginJar.getPath().substring(1));
+            // Obtain the directory name of the relative location of the file path.
+            // for example, The path is
+            // ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar, so the name
+            // obtained here is the connector plugin name : JDBC

Review Comment:
   So. You still don't fix this problem.



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java:
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
+import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.internal.serialization.Data;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.engine.core.job.AbstractJobEnvironment.getJarUrlsFromIdentifiers;
+import static org.awaitility.Awaitility.await;
+
+public class ConnectorPackageServiceTest extends AbstractSeaTunnelServerTest {
+
+    @Test
+    public void testMasterNodeActive() {
+        HazelcastInstanceImpl instance1 =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        TestUtils.getClusterName(
+                                "ConnectorPackageServiceTest_testMasterNodeActive"));
+        HazelcastInstanceImpl instance2 =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        TestUtils.getClusterName(
+                                "ConnectorPackageServiceTest_testMasterNodeActive"));
+
+        SeaTunnelServer server1 =
+                instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+        SeaTunnelServer server2 =
+                instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+
+        Assertions.assertTrue(server1.isMasterNode());
+        ConnectorPackageService connectorPackageService1 = server1.getConnectorPackageService();
+        Assertions.assertTrue(connectorPackageService1.isConnectorPackageServiceActive());
+

Review Comment:
   So, even disable upload jar feature, the connectorPackageService instance will be created? I think we create it only when this feature enabled.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -259,6 +259,20 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
             runningJobInfoIMap.remove(jobId);
             return;
         }
+        //        Data jobImmutableInformationData = jobInfo.getJobImmutableInformation();
+        //        JobImmutableInformation jobImmutableInformation =
+        //
+        // nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
+        //        List<ConnectorJarIdentifier> pluginJarIdentifiers =
+        //                jobImmutableInformation.getPluginJarIdentifiers();
+        //        pluginJarIdentifiers.forEach(
+        //                pluginJarIdentifier -> {
+        //                    String storagePath = pluginJarIdentifier.getStoragePath();
+        //                    if (!new File(storagePath).exists()) {
+        //                        connectorPackageHAStorage.downloadConnectorJar(jobId,
+        // pluginJarIdentifier);
+        //                    }
+        //                });

Review Comment:
   Any reason?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -113,6 +116,7 @@ public PhysicalVertex(
             int pipelineId,
             int totalPipelineNum,
             Set<URL> pluginJarsUrls,
+            Set<ConnectorJarIdentifier> connectorJarIdentifiers,

Review Comment:
   ditto



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java:
##########
@@ -40,6 +41,8 @@ public class TaskGroupImmutableInformation implements IdentifiedDataSerializable
 
     private Set<URL> jars;
 
+    private Set<ConnectorJarIdentifier> connectorJarIdentifiers;

Review Comment:
   Why we need maintain two different jars collection? It looks bad.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -252,9 +259,25 @@ public TaskDeployState deployTask(@NonNull TaskGroupImmutableInformation taskImm
                         taskImmutableInfo.getExecutionId()));
         TaskGroup taskGroup = null;
         try {
+            Set<ConnectorJarIdentifier> connectorJarIdentifiers =
+                    taskImmutableInfo.getConnectorJarIdentifiers();
             Set<URL> jars = taskImmutableInfo.getJars();
             ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-            if (!CollectionUtils.isEmpty(jars)) {
+            if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
+                // Prioritize obtaining the jar package file required for the current task execution
+                // from the local,
+                // if it does not exist locally, it will be downloaded from the master node.

Review Comment:
   ```suggestion
                   // Prioritize obtaining the jar package file required for the current task execution
                   // from the local, if it does not exist locally, it will be downloaded from the master node.
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/AbstractConnectorJarStorageStrategy.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelProperties;
+import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.task.operation.DeleteConnectorJarInExecutionNode;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Member;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public abstract class AbstractConnectorJarStorageStrategy implements ConnectorJarStorageStrategy {
+
+    protected static final ILogger LOGGER =
+            Logger.getLogger(AbstractConnectorJarStorageStrategy.class);
+
+    protected static final String COMMON_PLUGIN_JAR_STORAGE_PATH = "/plugins";
+
+    protected static final String CONNECTOR_PLUGIN_JAR_STORAGE_PATH = "/connectors/seatunnel";
+
+    protected String storageDir;
+
+    protected final ConnectorJarStorageConfig connectorJarStorageConfig;
+
+    protected final SeaTunnelServer seaTunnelServer;
+
+    protected final NodeEngineImpl nodeEngine;
+
+    public AbstractConnectorJarStorageStrategy(
+            ConnectorJarStorageConfig connectorJarStorageConfig, SeaTunnelServer seaTunnelServer) {
+        this.seaTunnelServer = seaTunnelServer;
+        this.nodeEngine = seaTunnelServer.getNodeEngine();
+        checkNotNull(connectorJarStorageConfig);
+        this.connectorJarStorageConfig = connectorJarStorageConfig;
+        this.storageDir = getConnectorJarStorageDir();
+    }
+
+    @Override
+    public File getStorageLocation(long jobId, ConnectorJar connectorJar) {
+        checkNotNull(jobId);
+        File file = new File(getStorageLocationPath(jobId, connectorJar));
+        try {
+            Files.createDirectories(file.getParentFile().toPath());
+        } catch (IOException e) {
+            LOGGER.warning(
+                    String.format(
+                            "The creation of directories : %s for the connector jar storage path has failed.",
+                            file.getParentFile().toPath().toString()));
+        }
+        return file;
+    }
+
+    @Override
+    public ConnectorJarIdentifier getConnectorJarIdentifier(long jobId, ConnectorJar connectorJar) {
+        return ConnectorJarIdentifier.of(connectorJar, getStorageLocationPath(jobId, connectorJar));
+    }
+
+    @Override
+    public Path storageConnectorJarFileInternal(ConnectorJar connectorJar, File storageFile) {

Review Comment:
   Please use `Optional<Path>` as return cause you may return null.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -259,6 +259,20 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
             runningJobInfoIMap.remove(jobId);
             return;
         }
+        //        Data jobImmutableInformationData = jobInfo.getJobImmutableInformation();
+        //        JobImmutableInformation jobImmutableInformation =
+        //
+        // nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
+        //        List<ConnectorJarIdentifier> pluginJarIdentifiers =
+        //                jobImmutableInformation.getPluginJarIdentifiers();
+        //        pluginJarIdentifiers.forEach(
+        //                pluginJarIdentifier -> {
+        //                    String storagePath = pluginJarIdentifier.getStoragePath();
+        //                    if (!new File(storagePath).exists()) {
+        //                        connectorPackageHAStorage.downloadConnectorJar(jobId,
+        // pluginJarIdentifier);
+        //                    }
+        //                });

Review Comment:
   Any reason?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java:
##########
@@ -96,6 +96,7 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
         Config config = RestUtil.buildConfig(requestHandle(httpPostCommand));
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName(requestParams.get(RestConstant.JOB_NAME));
+        CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService();

Review Comment:
   Any reason for move this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1362208254


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   If we send a request from the client to the server to determine whether the connector Jar package exists, then if the Jar package exists, a separate request API is required under the shared storage strategy to increase the Jar package reference count of the connector JarRefCounters in the shared storage mode. Do I need to change to this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#issuecomment-1766773912

   > Please add e2e testcase
   
   
   
   > Please add e2e testcase
   
   I have added the e2e test case, PTAL! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1372545680


##########
docs/en/seatunnel-engine/optimize-job-submission.md:
##########
@@ -0,0 +1,101 @@
+---

Review Comment:
   This doc more like design, not face to user. Please move it to the issue related to this PR.



##########
docs/en/seatunnel-engine/optimize-job-submission.md:
##########
@@ -0,0 +1,101 @@
+---

Review Comment:
   This doc more like design, not face to user. Please move it to issue related to this PR.



##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.seatunnel;
+
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+
+/**
+ * This class is the base class of SeatunnelEnvironment test for connector package service. The
+ * before method will create a Seatunnel Zeta cluster with connector package service enabled, and
+ * after method will close the Seatunnel Zeta cluster. You can use {@link
+ * ConnectorPackageServiceContainer#executeJob} to submit a seatunnel config and run a seatunnel
+ * job.
+ */
+@NoArgsConstructor
+@Slf4j
+public class ConnectorPackageServiceContainer extends AbstractTestContainer {

Review Comment:
   Let us add `TestContainer` SPI to enable test on all exist test case.
   ```suggestion
   @AutoService(TestContainer.class)
   public class ConnectorPackageServiceContainer extends AbstractTestContainer {
   ```



##########
docs/en/seatunnel-engine/optimize-job-submission.md:
##########
@@ -0,0 +1,101 @@
+---

Review Comment:
   This doc more like design, not face to user. Please move it to issue related to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1370332332


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java:
##########
@@ -96,6 +96,7 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
         Config config = RestUtil.buildConfig(requestHandle(httpPostCommand));
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName(requestParams.get(RestConstant.JOB_NAME));
+        CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService();

Review Comment:
   Sorry, there is no need to move, there is no problem here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1362425471


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   If we send a  request from the client to the server to check for the existence of the connector Jar package, and if the Jar package does exist, we need a separate API request to increment the reference count for the connector Jar package in shared connector jar storage mode. Should we switch to this approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#issuecomment-1770297495

   Could you try `rebase on dev`? I tried reading all the modified code locally, but it was difficult to `undo commit` due to too many commits. Thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1366474358


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,92 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            /**
+             * TODO: Before uploading the Jar package file the server, first determine whether the
+             * server holds the current Jar. If the server holds the same Jar package file, there is
+             * no need for additional uploads.
+             */
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
+            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+            transformActionPluginJarUrls(actions, pluginJarIdentifiers);
+            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+            connectorJarIdentifiers.addAll(commonJarIdentifiers);
+            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+            jarUrls.addAll(commonPluginJarUrls);
+            jarUrls.addAll(connectorPluginJarUrls);
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, commonPluginJarUrls, commonJarIdentifiers);
+                    });
+            actions.forEach(
+                    action -> {
+                        org.apache.seatunnel.engine.core.dag.actions.Config config =
+                                action.getConfig();
+                    });
+        } else {
+            jarUrls.addAll(commonPluginJars);
+            jarUrls.addAll(immutablePair.getRight());
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, new HashSet<>(commonPluginJars), Collections.emptySet());
+                    });

Review Comment:
   I understand your point and have implemented the the check of Jar package on the server side. PTAL!
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] liugddx commented on pull request #5542: [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic

Posted by "liugddx (via GitHub)" <gi...@apache.org>.
liugddx commented on PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#issuecomment-1731062412

   Nice job. Please maintain the documentation and add test cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1370354186


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -259,6 +259,20 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
             runningJobInfoIMap.remove(jobId);
             return;
         }
+        //        Data jobImmutableInformationData = jobInfo.getJobImmutableInformation();
+        //        JobImmutableInformation jobImmutableInformation =
+        //
+        // nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
+        //        List<ConnectorJarIdentifier> pluginJarIdentifiers =
+        //                jobImmutableInformation.getPluginJarIdentifiers();
+        //        pluginJarIdentifiers.forEach(
+        //                pluginJarIdentifier -> {
+        //                    String storagePath = pluginJarIdentifier.getStoragePath();
+        //                    if (!new File(storagePath).exists()) {
+        //                        connectorPackageHAStorage.downloadConnectorJar(jobId,
+        // pluginJarIdentifier);
+        //                    }
+        //                });

Review Comment:
   I will remove this useless annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1365599370


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ConnectorPackageClient {
+
+    private static final ILogger LOGGER = Logger.getLogger(ConnectorPackageClient.class);
+
+    private final SeaTunnelHazelcastClient hazelcastClient;
+
+    public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
+        checkNotNull(hazelcastClient);
+        this.hazelcastClient = hazelcastClient;
+    }
+
+    public Set<ConnectorJarIdentifier> uploadCommonPluginJars(
+            long jobId, List<URL> commonPluginJars) {
+        Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
+        // Upload commonPluginJar
+        for (URL commonPluginJar : commonPluginJars) {
+            // handle the local file path
+            // origin path : /${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
+            // handled path : ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+            Path path = Paths.get(commonPluginJar.getPath().substring(1));
+            // Obtain the directory name of the relative location of the file path.
+            // for example, The path is
+            // ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar, so the name
+            // obtained here is the connector plugin name : JDBC

Review Comment:
   May I ask if we can keep the function of uploading common jars to the Zeta engine? When common jars do not exist in Zeta's `lib`, it can upload the local common jars of the client to the lib directory of all engine nodes. This way, even if the user does not place a jar on all nodes in Zeta's lib, the task can still be executed normally.



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ConnectorPackageClient {
+
+    private static final ILogger LOGGER = Logger.getLogger(ConnectorPackageClient.class);
+
+    private final SeaTunnelHazelcastClient hazelcastClient;
+
+    public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
+        checkNotNull(hazelcastClient);
+        this.hazelcastClient = hazelcastClient;
+    }
+
+    public Set<ConnectorJarIdentifier> uploadCommonPluginJars(
+            long jobId, List<URL> commonPluginJars) {
+        Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
+        // Upload commonPluginJar
+        for (URL commonPluginJar : commonPluginJars) {
+            // handle the local file path
+            // origin path : /${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
+            // handled path : ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+            Path path = Paths.get(commonPluginJar.getPath().substring(1));
+            // Obtain the directory name of the relative location of the file path.
+            // for example, The path is
+            // ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar, so the name
+            // obtained here is the connector plugin name : JDBC

Review Comment:
   May I ask if we can keep the function of uploading common jars to the Zeta engine? When common jars do not exist in Zeta's `lib`, it can upload the local common jars of the client to the lib directory of all engine nodes. This way, even if the user does not place a jar on all nodes in Zeta's lib, the task can still be executed normally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1362208254


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   If we send a request from the client to the server to determine whether the connector Jar package exists, then if the Jar package exists, a separate request API is required under the shared storage strategy to increase the Jar package reference count of the connector JarRefCounters in the shared storage mode. Do I need to change to this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1371282944


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java:
##########
@@ -40,6 +41,8 @@ public class TaskGroupImmutableInformation implements IdentifiedDataSerializable
 
     private Set<URL> jars;
 
+    private Set<ConnectorJarIdentifier> connectorJarIdentifiers;

Review Comment:
   cc @hailin0 @EricJoy2048 WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1366473227


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java:
##########
@@ -64,6 +83,82 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage == true) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);

Review Comment:
   I understand your point and have implemented the the check of Jar package on the server side. PTAL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1370274833


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java:
##########
@@ -40,6 +41,8 @@ public class TaskGroupImmutableInformation implements IdentifiedDataSerializable
 
     private Set<URL> jars;
 
+    private Set<ConnectorJarIdentifier> connectorJarIdentifiers;

Review Comment:
   ConnectorJarIdentifiers represent the unique identifier of a Jar package file, which contains more information about the Jar package file, including the name of the connector plugin using the current Jar, the type of the current Jar package, and so on. If we use the Identifier to obtain Jar packages or distributed file systems (functions that have not yet been implemented) from the master node, it will be much more convenient because the root directory of Jar packages stored on the distributed file system can be configured by the user. We can generate a storage path on the distributed file system based on user user-configured root directory, Jar package names, Jar package types, and other information, achieving more flexible storage. If we want the Jar package storage path on the distributed file system to be completely consistent with the local path, then I can remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1370307858


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -253,4 +257,63 @@ private void printExecutionInfo() {
             coordinatorService.printJobDetailInfo();
         }
     }
+
+    public SeaTunnelConfig getSeaTunnelConfig() {
+        return seaTunnelConfig;
+    }
+
+    public NodeEngineImpl getNodeEngine() {
+        return nodeEngine;
+    }
+
+    public ConnectorPackageService getConnectorPackageService() {

Review Comment:
   Because in the previous incorrect design, during the initialization of the ConnectorPackageService, the CoordinaterService would be obtained first, and at this time, the CoordinaterService has not yet completed the initialization process. My design here is incorrect. Thank you very much for your reminder. I will put the ConnectorPackageService into the CoordinatorService.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1370765470


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ConnectorPackageClient {
+
+    private static final ILogger LOGGER = Logger.getLogger(ConnectorPackageClient.class);
+
+    private final SeaTunnelHazelcastClient hazelcastClient;
+
+    public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
+        checkNotNull(hazelcastClient);
+        this.hazelcastClient = hazelcastClient;
+    }
+
+    public Set<ConnectorJarIdentifier> uploadCommonPluginJars(
+            long jobId, List<URL> commonPluginJars) {
+        Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
+        // Upload commonPluginJar
+        for (URL commonPluginJar : commonPluginJars) {
+            // handle the local file path
+            // origin path : /${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
+            // handled path : ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+            Path path = Paths.get(commonPluginJar.getPath().substring(1));
+            // Obtain the directory name of the relative location of the file path.
+            // for example, The path is
+            // ${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar, so the name
+            // obtained here is the connector plugin name : JDBC

Review Comment:
   I'm very sorry, I don't quite understand your previous description. I have made modifications to this part of the code. Could you please help me see if this logic is right? If not, I will make the necessary modifications.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ST-Engine][Design] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1372653384


##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java:
##########
@@ -64,6 +83,81 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.

Review Comment:
   ```suggestion
           // Enable upload connector jar package to engine server, automatically upload connector Jar
           // packages and dependent third-party Jar packages to the server before job execution.
           // Enabling this configuration does not require the server to hold all connector Jar packages.
   ```



##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java:
##########
@@ -64,6 +83,81 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
     }
 
+    @Override
+    protected LogicalDag getLogicalDag() {
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+        actions.addAll(immutablePair.getLeft());
+        // Enable upload connector jar package to engine server, automatically upload connector Jar
+        // packages
+        // and dependent third-party Jar packages to the server before job execution.
+        // Enabling this configuration does not require the server to hold all connector Jar
+        // packages.
+        boolean enableUploadConnectorJarPackage =
+                seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+        if (enableUploadConnectorJarPackage) {
+            Set<ConnectorJarIdentifier> commonJarIdentifiers =
+                    connectorPackageClient.uploadCommonPluginJars(
+                            Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
+            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+            uploadActionPluginJar(actions, pluginJarIdentifiers);
+            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+            connectorJarIdentifiers.addAll(commonJarIdentifiers);
+            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+            jarUrls.addAll(commonPluginJarUrls);
+            jarUrls.addAll(connectorPluginJarUrls);
+            actions.forEach(
+                    action -> {
+                        addCommonPluginJarsToAction(
+                                action, commonPluginJarUrls, commonJarIdentifiers);
+                    });
+            actions.forEach(
+                    action -> {
+                        org.apache.seatunnel.engine.core.dag.actions.Config config =
+                                action.getConfig();
+                    });

Review Comment:
   Is this part right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Zeta][Feature] Optimize SeaTunnel Zeta engine Jar package upload logic [seatunnel]

Posted by "lianghuan-xatu (via GitHub)" <gi...@apache.org>.
lianghuan-xatu commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1374711887


##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container.seatunnel;
+
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+
+/**
+ * This class is the base class of SeatunnelEnvironment test for connector package service. The
+ * before method will create a Seatunnel Zeta cluster with connector package service enabled, and
+ * after method will close the Seatunnel Zeta cluster. You can use {@link
+ * ConnectorPackageServiceContainer#executeJob} to submit a seatunnel config and run a seatunnel
+ * job.
+ */
+@NoArgsConstructor
+@Slf4j
+public class ConnectorPackageServiceContainer extends AbstractTestContainer {

Review Comment:
   Done, PTAL!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org