You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/03/08 05:36:26 UTC

[incubator-seatunnel] branch dev updated: [BugFix] Fix S3Redshift connector copy file to redshift but file not found bug (#4282)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bcac24ebf [BugFix] Fix S3Redshift connector copy file to redshift but file not found bug (#4282)
bcac24ebf is described below

commit bcac24ebfce4a3825ff7a898fdb9c2433ce20285
Author: Eric <ga...@gmail.com>
AuthorDate: Wed Mar 8 13:36:19 2023 +0800

    [BugFix] Fix S3Redshift connector copy file to redshift but file not found bug (#4282)
---
 config/log4j2.properties                           |  3 ++
 .../connector-v2/Error-Quick-Reference-Manual.md   |  6 ++++
 .../commit/S3RedshiftSinkAggregatedCommitter.java  | 20 ++++++++------
 .../exception/S3RedshiftConnectorErrorCode.java    | 32 +++++++++++++---------
 .../server/checkpoint/CheckpointCloseReason.java   |  3 +-
 .../operation/CheckpointFinishedOperation.java     |  8 +++---
 .../server/task/SinkAggregatedCommitterTask.java   | 10 ++++++-
 7 files changed, 55 insertions(+), 27 deletions(-)

diff --git a/config/log4j2.properties b/config/log4j2.properties
index 2462a1c25..8b8d47db8 100644
--- a/config/log4j2.properties
+++ b/config/log4j2.properties
@@ -24,6 +24,9 @@ property.file_ttl = 7d
 
 rootLogger.level = INFO
 
+logger.zeta.name=org.apache.seatunnel.engine
+logger.zeta.level=INFO
+
 ############################ log output to console #############################
 #rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
 #rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index e18167362..9b54e555e 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -244,3 +244,9 @@ problems encountered by users.
 |----------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | EMAIL-01 | Send email failed | When users encounter this error code, it means that send email to target server failed, please adjust the network environment according to the abnormal information |
 
+## S3Redshift Connector Error Codes
+
+|     code      |        description        |                                                                                                   solution                                                                                                   |
+|---------------|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| S3RedShift-01 | Aggregate committer error | S3Redshift Sink Connector will write data to s3 and then move file to the target s3 path. And then use `Copy` action copy the data to Redshift. Please check the error log and find out the specific reason. |
+
diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
index a46c13f6c..97476fafc 100644
--- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
+++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggreg
 import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient;
 import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
+import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -59,24 +60,27 @@ public class S3RedshiftSinkAggregatedCommitter extends FileSinkAggregatedCommitt
                     try {
                         for (Map.Entry<String, Map<String, String>> entry :
                                 aggregatedCommitInfo.getTransactionMap().entrySet()) {
-                            for (Map.Entry<String, String> tmpFileEntry :
+                            for (Map.Entry<String, String> mvFileEntry :
                                     entry.getValue().entrySet()) {
-                                String sql = convertSql(tmpFileEntry.getKey());
+                                // first rename temp file
+                                fileSystemUtils.renameFile(
+                                        mvFileEntry.getKey(), mvFileEntry.getValue(), true);
+                                String sql = convertSql(mvFileEntry.getValue());
                                 log.debug("execute redshift sql is:" + sql);
                                 RedshiftJdbcClient.getInstance(pluginConfig).execute(sql);
-                                try {
-                                    fileSystemUtils.deleteFile(tmpFileEntry.getKey());
-                                } catch (IOException e) {
-                                    log.warn("delete tmp file error:" + tmpFileEntry.getKey());
-                                }
+                                fileSystemUtils.deleteFile(mvFileEntry.getValue());
                             }
+                            // second delete transaction directory
+                            fileSystemUtils.deleteFile(entry.getKey());
                         }
-
                     } catch (Exception e) {
                         log.error("commit aggregatedCommitInfo error ", e);
                         errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
+                        throw new S3RedshiftJdbcConnectorException(
+                                S3RedshiftConnectorErrorCode.AGGREGATE_COMMIT_ERROR, e);
                     }
                 });
+        // TODO errorAggregatedCommitInfoList Always empty, So return is no use
         return errorAggregatedCommitInfoList;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftConnectorErrorCode.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
copy to seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftConnectorErrorCode.java
index 11238b446..eeb716f8a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftConnectorErrorCode.java
@@ -15,23 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.checkpoint;
+package org.apache.seatunnel.connectors.seatunnel.redshift.exception;
 
-public enum CheckpointCloseReason {
-    PIPELINE_END("Pipeline turn to end state."),
-    CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
-    CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
-    CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
-    CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
-    CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error.");
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 
-    private final String message;
+public enum S3RedshiftConnectorErrorCode implements SeaTunnelErrorCode {
+    AGGREGATE_COMMIT_ERROR("S3RedShift-01", "Aggregate committer error");
 
-    CheckpointCloseReason(String message) {
-        this.message = message;
+    private final String code;
+
+    private final String description;
+
+    S3RedshiftConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
     }
 
-    public String message() {
-        return message;
+    @Override
+    public String getDescription() {
+        return this.description;
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index 11238b446..a500eb19a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -23,7 +23,8 @@ public enum CheckpointCloseReason {
     CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
     CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
     CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
-    CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error.");
+    CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error."),
+    AGGREGATE_COMMIT_ERROR("Aggregate commit error.");
 
     private final String message;
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index 28be62f51..40569c987 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -17,8 +17,10 @@
 
 package org.apache.seatunnel.engine.server.checkpoint.operation;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
 import org.apache.seatunnel.engine.server.execution.Task;
@@ -34,8 +36,6 @@ import lombok.NoArgsConstructor;
 
 import java.io.IOException;
 
-import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
-
 @Getter
 @NoArgsConstructor
 public class CheckpointFinishedOperation extends TaskOperation {
@@ -94,13 +94,13 @@ public class CheckpointFinishedOperation extends TaskOperation {
                         }
                         Thread.currentThread().setContextClassLoader(classLoader);
                     } catch (Exception e) {
-                        sneakyThrow(e);
+                        throw new SeaTunnelEngineException(ExceptionUtils.getMessage(e));
                     }
                     return null;
                 },
                 new RetryUtils.RetryMaterial(
                         Constant.OPERATION_RETRY_TIME,
-                        false,
+                        true,
                         exception ->
                                 exception instanceof TaskGroupContextNotFoundException
                                         && !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index eed806cb0..9b21c0856 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -22,12 +22,16 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
 import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
+import org.apache.commons.collections4.CollectionUtils;
+
 import com.hazelcast.cluster.Address;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -265,8 +269,12 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT>
                     aggregatedCommitInfo.addAll(value);
                     checkpointCommitInfoMap.remove(key);
                 });
-        aggregatedCommitter.commit(aggregatedCommitInfo);
+        List<AggregatedCommitInfoT> commit = aggregatedCommitter.commit(aggregatedCommitInfo);
         tryClose(checkpointId);
+        if (!CollectionUtils.isEmpty(commit)) {
+            log.error("aggregated committer error: {}", commit.size());
+            throw new CheckpointException(CheckpointCloseReason.AGGREGATE_COMMIT_ERROR);
+        }
     }
 
     @Override