You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "sunxiaojian (via GitHub)" <gi...@apache.org> on 2023/04/26 13:30:56 UTC

[GitHub] [incubator-seatunnel] sunxiaojian opened a new pull request, #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

sunxiaojian opened a new pull request, #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/incubator-seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/incubator-seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1186674652


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   > What kind of error would occur without the delay? See if the issue can be resolved by checking the status instead of using a delay.
   
   ![image](https://user-images.githubusercontent.com/6446530/236617484-5c642c01-3873-43f0-a47c-0408ec12e0c8.png)
   
   When MapStore is configured as initial-mode=EAGER, an exception will be thrown, and LAZY is normal, but it does not affect service operation
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] TyrantLucifer merged pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer merged PR #4683:
URL: https://github.com/apache/seatunnel/pull/4683


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#issuecomment-1543431860

   @ic4y PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1190729839


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +142,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30, // Wait for MapStore loading to complete, wait 30s

Review Comment:
   if(NodeState.ACTIVE.equals(nodeEngine.getNode().getState())



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1185797420


##########
docs/en/seatunnel-engine/deployment.md:
##########
@@ -179,6 +179,7 @@ map:
            type: hdfs
            namespace: /tmp/seatunnel/imap
            clusterName: seatunnel-cluster
+           storage.type: hdfs

Review Comment:
   > Is there a default value? for older versions
   
   default hdfs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1189776636


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +142,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30, // Wait for MapStore loading to complete, wait 30s

Review Comment:
   > You can check whether the MapStore has finished loading within the getJobMetricsBackupInterval() method. If it hasn't finished loading, just print a log and do not perform any other actions. This way, there is no need to add a delay at this point.
   
   Is there a specific method to check the loading status of MapStore, or is it determined by catching exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1186614731


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   > What kind of error would occur without the delay? See if the issue can be resolved by checking the status instead of using a delay.
   
   Okay, I should have added some processing, I just don't think it's necessary to schedule it immediately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1194571310


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/IFileWriter.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public interface IFileWriter<T> extends AutoCloseable {
+    String FILE_NAME = "wal.txt";

Review Comment:
   > @EricJoy2048 @Hisoka-X please check
   
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1191870861


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml:
##########
@@ -30,12 +30,20 @@
     <artifactId>imap-storage-file</artifactId>
     <name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>
 
+    <properties>

Review Comment:
   > `/incubator-seatunnel/pom.xml` @sunxiaojian
   
   @EricJoy2048 done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1191871004


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   @ic4y PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1192215550


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   > It seems fine, and it's not throwing any errors now, right?
   
   Yes, there are no exceptions thrown anymore. Through debugging, it has been determined that NullPointerException is caused by an uninitialized InvocationContext



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#issuecomment-1523430370

   @TyrantLucifer PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1189705273


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml:
##########
@@ -30,12 +30,20 @@
     <artifactId>imap-storage-file</artifactId>
     <name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>
 
+    <properties>

Review Comment:
   > Please add this to `pom.xml` not in submodule pom please.
   
   Move to where?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1189718816


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   > You can check whether the MapStore has finished loading within the getJobMetricsBackupInterval() method. If it hasn't finished loading, just print a log and do not perform any other actions. This way, there is no need to add a delay at this point.
   
   Is there a specific method to check the loading status of MapStore, or is it determined by catching exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1191855549


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   NodeEngine needs to first execute the start method before executing getMap after initializing invoiceContext
   ![image](https://github.com/apache/incubator-seatunnel/assets/6446530/f294ea83-ef70-4c9e-afe5-d7a080aa7f80)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#issuecomment-1536059372

   > > 
   > 
   > I suggest add a test case like `testStreamJobRestoreInAllNodeDown` in https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
   > 
   > Because this test need OSS account, So before you push the code, you can add `@disable` to disable the test.
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1185825373


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.imap.storage.file.config;
+
+public enum FileConfiguration {
+    HDFS("hdfs", new HdfsConfiguration()),
+    S3("s3", new S3Configuration()),
+    OSS("oss", new OssConfiguration());
+
+    /** file system type */
+    private final String name;
+
+    /** file system configuration */
+    private final AbstractConfiguration configuration;
+
+    FileConfiguration(String name, AbstractConfiguration configuration) {
+        this.name = name;
+        this.configuration = configuration;
+    }
+
+    public AbstractConfiguration getConfiguration(String name) {

Review Comment:
   > Unused `name`?
   
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1191846520


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml:
##########
@@ -30,12 +30,20 @@
     <artifactId>imap-storage-file</artifactId>
     <name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>
 
+    <properties>

Review Comment:
   `/incubator-seatunnel/pom.xml`



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml:
##########
@@ -30,12 +30,20 @@
     <artifactId>imap-storage-file</artifactId>
     <name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>
 
+    <properties>

Review Comment:
   `/incubator-seatunnel/pom.xml` @sunxiaojian 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1192210072


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   It seems fine, and it's not throwing any errors now, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1194570011


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/OssWriter.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
+import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
+import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils;
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.curator.shaded.com.google.common.io.ByteStreams;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Slf4j
+public class OssWriter implements IFileWriter<IMapFileData> {
+    private FileSystem fs;
+    private Path parentPath;
+    private Path path;
+    private Serializer serializer;
+
+    private ByteBuf bf = Unpooled.buffer(1024);
+
+    // block size
+    private long blockSize = 1024 * 1024;
+
+    private AtomicLong index = new AtomicLong(0);
+
+    @Override
+    public void initialize(FileSystem fs, Path parentPath, Serializer serializer)
+            throws IOException {
+        this.fs = fs;
+        this.serializer = serializer;
+        this.parentPath = parentPath;
+        this.path = createNewPath();
+        if (fs.exists(path)) {
+            try (FSDataInputStream fsDataInputStream = fs.open(path)) {
+                bf.writeBytes(ByteStreams.toByteArray(fsDataInputStream));
+            }
+        }
+    }
+
+    @Override
+    public String identifier() {
+        return "oss";
+    }
+
+    // TODO Synchronous write, asynchronous write can be added in the future
+    @Override
+    public void write(IMapFileData data) throws IOException {
+        byte[] bytes = serializer.serialize(data);
+        this.write(bytes);
+    }
+
+    private void write(byte[] bytes) {
+        try (FSDataOutputStream out = fs.create(path, true)) {
+            // Write to bytebuffer
+            byte[] data = WALDataUtils.wrapperBytes(bytes);
+            bf.writeBytes(data);
+
+            // Read all bytes
+            byte[] allBytes = new byte[bf.readableBytes()];
+            bf.readBytes(allBytes);
+
+            // write filesystem
+            out.write(allBytes);
+
+            // check and reset
+            checkAndSetNextScheduleRotation(allBytes.length);
+
+        } catch (Exception ex) {
+            throw new IMapStorageException(ex);
+        }
+    }
+
+    private void checkAndSetNextScheduleRotation(long allBytes) {
+        if (allBytes > blockSize) {
+            this.path = createNewPath();
+            this.bf.clear();
+        } else {
+            // reset index
+            bf.resetReaderIndex();
+        }
+    }
+
+    public Path createNewPath() {
+        return new Path(parentPath, index.incrementAndGet() + "-" + FILE_NAME);

Review Comment:
   The file path may be duplicated when the JVM restart and index lost the old value?



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/S3Writer.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class S3Writer extends OssWriter {

Review Comment:
   Why `S3Writer` extends `OssWriter` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#issuecomment-1535895968

   > testStreamJobRestoreInAllNodeDown
   
   default hdfs, compatible with old configurations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#issuecomment-1535963300

   > 
   I suggest add a test case like `testStreamJobRestoreInAllNodeDown` in https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java 
   
   Because this test need OSS account, So before you push the code, you can add `@disable` to disable the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1185937413


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   > Is the number too big?
   > 
   > @ic4y
   
   It should be 30 seconds



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1195070598


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/OssWriter.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
+import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
+import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils;
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.curator.shaded.com.google.common.io.ByteStreams;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Slf4j
+public class OssWriter implements IFileWriter<IMapFileData> {
+    private FileSystem fs;
+    private Path parentPath;
+    private Path path;
+    private Serializer serializer;
+
+    private ByteBuf bf = Unpooled.buffer(1024);
+
+    // block size
+    private long blockSize = 1024 * 1024;
+
+    private AtomicLong index = new AtomicLong(0);
+
+    @Override
+    public void initialize(FileSystem fs, Path parentPath, Serializer serializer)
+            throws IOException {
+        this.fs = fs;
+        this.serializer = serializer;
+        this.parentPath = parentPath;
+        this.path = createNewPath();
+        if (fs.exists(path)) {
+            try (FSDataInputStream fsDataInputStream = fs.open(path)) {
+                bf.writeBytes(ByteStreams.toByteArray(fsDataInputStream));
+            }
+        }
+    }
+
+    @Override
+    public String identifier() {
+        return "oss";
+    }
+
+    // TODO Synchronous write, asynchronous write can be added in the future
+    @Override
+    public void write(IMapFileData data) throws IOException {
+        byte[] bytes = serializer.serialize(data);
+        this.write(bytes);
+    }
+
+    private void write(byte[] bytes) {
+        try (FSDataOutputStream out = fs.create(path, true)) {
+            // Write to bytebuffer
+            byte[] data = WALDataUtils.wrapperBytes(bytes);
+            bf.writeBytes(data);
+
+            // Read all bytes
+            byte[] allBytes = new byte[bf.readableBytes()];
+            bf.readBytes(allBytes);
+
+            // write filesystem
+            out.write(allBytes);
+
+            // check and reset
+            checkAndSetNextScheduleRotation(allBytes.length);
+
+        } catch (Exception ex) {
+            throw new IMapStorageException(ex);
+        }
+    }
+
+    private void checkAndSetNextScheduleRotation(long allBytes) {
+        if (allBytes > blockSize) {
+            this.path = createNewPath();
+            this.bf.clear();
+        } else {
+            // reset index
+            bf.resetReaderIndex();
+        }
+    }
+
+    public Path createNewPath() {
+        return new Path(parentPath, index.incrementAndGet() + "-" + FILE_NAME);

Review Comment:
   > The file path may be duplicated when the JVM restart and index lost the old value?
   
   No, the parent path will be changed every time it is started



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1191853123


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   > I've reviewed the code, and the exception might be thrown because the current Node hasn't finished initializing. Try using nodeEngine.getNode().getState().equals(NodeState.ACTIVE) and see how it goes.
   
   @ic4y I have determined the cause of the null pointer, let me verify it first



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1191168509


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   > 1、"Why is it necessary for all nodes to be STARTED? Isn't it enough to just judge if the ClusterState is ACTIVE?" 2、"The getLifecycleService should obtain the LifecycleListener of the current instance, which should only receive the status change of the current node. In a cluster mode, shouldn't the waitClusterStarted.getCount() always be non-zero?"
   > 
   > <img alt="image" width="667" src="https://user-images.githubusercontent.com/83933160/237607445-6a09f69b-450b-429c-a17f-ab6b7a9f9a75.png">
   
   @ic4y   Just judging that ClusterState=ACTIVE will still throw the same exception.  Let me reconsider how to solve it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#issuecomment-1547086772

   @EricJoy2048 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1190729839


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +142,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30, // Wait for MapStore loading to complete, wait 30s

Review Comment:
   if(NodeState.ACTIVE.equals(nodeEngine.getNode().getState())



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#issuecomment-1524758192

   Please add test reference `testStreamJobRestoreInAllNodeDown ` in  `https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1185722842


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/OssWriter.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
+import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
+import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils;
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.curator.shaded.com.google.common.io.ByteStreams;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Slf4j
+public class OssWriter implements IFileWriter<IMapFileData> {
+    FileSystem fs;
+    Path parentPath;
+    Path path;
+    Serializer serializer;
+
+    ByteBuf bf = Unpooled.buffer(1024);
+
+    // block size
+    long blockSize = 1024 * 1024;
+
+    AtomicLong index = new AtomicLong(0);

Review Comment:
   add `private ...`



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.imap.storage.file;
+
+import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledOnOs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.condition.OS.LINUX;
+import static org.junit.jupiter.api.condition.OS.MAC;
+
+@EnabledOnOs({LINUX, MAC})
+@Disabled
+public class IMapFileOSSStorageTest {
+
+    static String OSS_BUCKET_NAME = "oss://your bucket name/";
+    static String OSS_ENDPOINT = "your oss endpoint";
+    static String OSS_ACCESS_KEY_ID = "oss accessKey id";
+    static String OSS_ACCESS_KEY_SECRET = "oss accessKey secret";
+    private static final Configuration CONF;
+
+    private static final IMapFileStorage STORAGE;
+
+    static {
+        CONF = new Configuration();
+        CONF.set("storage.type", "oss");
+        CONF.set("fs.defaultFS", OSS_BUCKET_NAME);
+        CONF.set("fs.oss.endpoint", OSS_ENDPOINT);
+        CONF.set("fs.oss.accessKeyId", OSS_ACCESS_KEY_ID);
+        CONF.set("fs.oss.accessKeySecret", OSS_ACCESS_KEY_SECRET);
+        CONF.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+        CONF.set(
+                "fs.oss.credentials.provider",
+                "org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider");
+
+        STORAGE = new IMapFileStorage();
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("storage.type", "oss");
+        properties.put("oss.bucket", OSS_BUCKET_NAME);
+        properties.put("fs.oss.endpoint", OSS_ENDPOINT);
+        properties.put("fs.oss.accessKeyId", OSS_ACCESS_KEY_ID);
+        properties.put("fs.oss.accessKeySecret", OSS_ACCESS_KEY_SECRET);
+        properties.put("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+        properties.put(
+                "fs.oss.credentials.provider",
+                "org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider");
+        properties.put(FileConstants.FileInitProperties.BUSINESS_KEY, "random");
+        properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY, "/seatunnel-test/2");

Review Comment:
   extract variable `/seatunnel-test/2`



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   Is the number too big?
   
   @ic4y 



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
+import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils;
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+public class HdfsWriter implements IFileWriter<IMapFileData> {
+
+    FSDataOutputStream out;
+
+    Serializer serializer;

Review Comment:
   ```suggestion
       private FSDataOutputStream out;
   
       private Serializer serializer;
   ```



##########
docs/en/seatunnel-engine/deployment.md:
##########
@@ -179,6 +179,7 @@ map:
            type: hdfs
            namespace: /tmp/seatunnel/imap
            clusterName: seatunnel-cluster
+           storage.type: hdfs

Review Comment:
   Is there a default value? for older versions



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/IFileWriter.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public interface IFileWriter<T> extends AutoCloseable {
+    String FILE_NAME = "wal.txt";

Review Comment:
   @EricJoy2048 @Hisoka-X please check



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.imap.storage.file.config;
+
+public enum FileConfiguration {
+    HDFS("hdfs", new HdfsConfiguration()),
+    S3("s3", new S3Configuration()),
+    OSS("oss", new OssConfiguration());
+
+    /** file system type */
+    private final String name;
+
+    /** file system configuration */
+    private final AbstractConfiguration configuration;
+
+    FileConfiguration(String name, AbstractConfiguration configuration) {
+        this.name = name;
+        this.configuration = configuration;
+    }
+
+    public AbstractConfiguration getConfiguration(String name) {

Review Comment:
   Unused `name`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1189383524


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   You can check whether the MapStore has finished loading within the getJobMetricsBackupInterval() method. If it hasn't finished loading, just print a log and do not perform any other actions. This way, there is no need to add a delay at this point.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +142,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30, // Wait for MapStore loading to complete, wait 30s

Review Comment:
   You can check whether the MapStore has finished loading within the getJobMetricsBackupInterval() method. If it hasn't finished loading, just print a log and do not perform any other actions. This way, there is no need to add a delay at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1189380470


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +142,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30, // Wait for MapStore loading to complete, wait 30s

Review Comment:
   @ic4y  PTAL



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml:
##########
@@ -30,12 +30,20 @@
     <artifactId>imap-storage-file</artifactId>
     <name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>
 
+    <properties>

Review Comment:
   Please add this to `pom.xml` not in submodule pom please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1190710824


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml:
##########
@@ -30,12 +30,20 @@
     <artifactId>imap-storage-file</artifactId>
     <name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>
 
+    <properties>

Review Comment:
   > > Please add this to `pom.xml` not in submodule pom please.
   > 
   > Move to where?
   
   @EricJoy2048 



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +142,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30, // Wait for MapStore loading to complete, wait 30s

Review Comment:
   > You can check whether the MapStore has finished loading within the getJobMetricsBackupInterval() method. If it hasn't finished loading, just print a log and do not perform any other actions. This way, there is no need to add a delay at this point.
   
   Is there a specific method to check the loading status of MapStore, or is it determined by catching exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1191850323


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   I've reviewed the code, and the exception might be thrown because the current Node hasn't finished initializing. Try using nodeEngine.getNode().getState().equals(NodeState.ACTIVE) and see how it goes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1192215550


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   > It seems fine, and it's not throwing any errors now, right?
   
   Yes, no exception was thrown. Through debugging, it has been clarified that NullPointerException is caused by the uninitialized InvocationContext



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1189718816


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   > You can check whether the MapStore has finished loading within the getJobMetricsBackupInterval() method. If it hasn't finished loading, just print a log and do not perform any other actions. This way, there is no need to add a delay at this point.
   
   Is there a specific method to check the loading status of MapStore, or is it determined by catching exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1198507915


##########
tools/dependencies/known-dependencies.txt:
##########
@@ -34,4 +34,7 @@ j2objc-annotations-1.1.jar
 jsr305-1.3.9.jar
 jsr305-3.0.0.jar
 jsr305-3.0.2.jar
-listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
\ No newline at end of file
+listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar

Review Comment:
   When you add a new jar package, you first need to process its license,  you can find other jars license in `seatunnel-dist/release-docs/LICENSE`. When you process its license complete, you can update `tools/dependencies/known-dependencies.txt` to fix the dependency check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1190753602


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -455,6 +459,37 @@ private synchronized void updateMetricsContextInImap() {
         contextMap.putAll(finishedExecutionContexts);
         contextMap.putAll(executionContexts);
         try {
+            ClusterState clusterState =
+                    nodeEngine.getHazelcastInstance().getCluster().getClusterState();
+            if (clusterState != ClusterState.ACTIVE) {
+                logger.warning(
+                        String.format(
+                                "The cluster is not ready yet, cluster state [%s], looking forward to the next "
+                                        + "scheduling",
+                                clusterState));
+                return;
+            }
+            // Waiting for cluster startup to complete
+            if (waitClusterStarted == null) {
+                waitClusterStarted =
+                        new CountDownLatch(
+                                nodeEngine.getHazelcastInstance().getCluster().getMembers().size());
+                nodeEngine
+                        .getHazelcastInstance()
+                        .getLifecycleService()
+                        .addLifecycleListener(
+                                event -> {
+                                    if (event.getState() == LifecycleEvent.LifecycleState.STARTED) {
+                                        waitClusterStarted.countDown();
+                                    }
+                                });
+            }
+            if (waitClusterStarted.getCount() > 0) {
+                logger.warning(
+                        "The cluster is not ready yet, looking forward to the next scheduling");
+                return;
+            }
+

Review Comment:
   1、"Why is it necessary for all nodes to be STARTED? Isn't it enough to just judge if the ClusterState is ACTIVE?"
   2、"The getLifecycleService should obtain the LifecycleListener of the current instance, which should only receive the status change of the current node. In a cluster mode, shouldn't the waitClusterStarted.getCount() always be non-zero?"
   
   <img width="667" alt="image" src="https://github.com/apache/incubator-seatunnel/assets/83933160/6a09f69b-450b-429c-a17f-ab6b7a9f9a75">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on a diff in pull request #4683: [Feature][SeaTunnel Engine IMap Storage] Add OSS support for Imap storage to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1195071263


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/S3Writer.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class S3Writer extends OssWriter {

Review Comment:
   > Why `S3Writer` extends `OssWriter` ?
   
   I made a unified abstraction



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sunxiaojian commented on pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "sunxiaojian (via GitHub)" <gi...@apache.org>.
sunxiaojian commented on PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#issuecomment-1535896446

   > Please add test reference `testStreamJobRestoreInAllNodeDown ` in `https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java`
   
   default hdfs, compatible with old configurations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1186599185


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   What kind of error would occur without the delay? See if the issue can be resolved by checking the status instead of using a delay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org