You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/03/08 05:36:26 UTC
[incubator-seatunnel] branch dev updated: [BugFix] Fix S3Redshift connector copy file to redshift but file not found bug (#4282)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bcac24ebf [BugFix] Fix S3Redshift connector copy file to redshift but file not found bug (#4282)
bcac24ebf is described below
commit bcac24ebfce4a3825ff7a898fdb9c2433ce20285
Author: Eric <ga...@gmail.com>
AuthorDate: Wed Mar 8 13:36:19 2023 +0800
[BugFix] Fix S3Redshift connector copy file to redshift but file not found bug (#4282)
---
config/log4j2.properties | 3 ++
.../connector-v2/Error-Quick-Reference-Manual.md | 6 ++++
.../commit/S3RedshiftSinkAggregatedCommitter.java | 20 ++++++++------
.../exception/S3RedshiftConnectorErrorCode.java | 32 +++++++++++++---------
.../server/checkpoint/CheckpointCloseReason.java | 3 +-
.../operation/CheckpointFinishedOperation.java | 8 +++---
.../server/task/SinkAggregatedCommitterTask.java | 10 ++++++-
7 files changed, 55 insertions(+), 27 deletions(-)
diff --git a/config/log4j2.properties b/config/log4j2.properties
index 2462a1c25..8b8d47db8 100644
--- a/config/log4j2.properties
+++ b/config/log4j2.properties
@@ -24,6 +24,9 @@ property.file_ttl = 7d
rootLogger.level = INFO
+logger.zeta.name=org.apache.seatunnel.engine
+logger.zeta.level=INFO
+
############################ log output to console #############################
#rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
#rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index e18167362..9b54e555e 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -244,3 +244,9 @@ problems encountered by users.
|----------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| EMAIL-01 | Send email failed | When users encounter this error code, it means that send email to target server failed, please adjust the network environment according to the abnormal information |
+## S3Redshift Connector Error Codes
+
+| code | description | solution |
+|---------------|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| S3RedShift-01 | Aggregate committer error | S3Redshift Sink Connector will write data to s3 and then move file to the target s3 path. And then use `Copy` action copy the data to Redshift. Please check the error log and find out the specific reason. |
+
diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
index a46c13f6c..97476fafc 100644
--- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
+++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggreg
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient;
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfig;
+import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
import org.apache.commons.lang3.StringUtils;
@@ -59,24 +60,27 @@ public class S3RedshiftSinkAggregatedCommitter extends FileSinkAggregatedCommitt
try {
for (Map.Entry<String, Map<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
- for (Map.Entry<String, String> tmpFileEntry :
+ for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
- String sql = convertSql(tmpFileEntry.getKey());
+ // first rename temp file
+ fileSystemUtils.renameFile(
+ mvFileEntry.getKey(), mvFileEntry.getValue(), true);
+ String sql = convertSql(mvFileEntry.getValue());
log.debug("execute redshift sql is:" + sql);
RedshiftJdbcClient.getInstance(pluginConfig).execute(sql);
- try {
- fileSystemUtils.deleteFile(tmpFileEntry.getKey());
- } catch (IOException e) {
- log.warn("delete tmp file error:" + tmpFileEntry.getKey());
- }
+ fileSystemUtils.deleteFile(mvFileEntry.getValue());
}
+ // second delete transaction directory
+ fileSystemUtils.deleteFile(entry.getKey());
}
-
} catch (Exception e) {
log.error("commit aggregatedCommitInfo error ", e);
errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
+ throw new S3RedshiftJdbcConnectorException(
+ S3RedshiftConnectorErrorCode.AGGREGATE_COMMIT_ERROR, e);
}
});
+ // TODO errorAggregatedCommitInfoList Always empty, So return is no use
return errorAggregatedCommitInfoList;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftConnectorErrorCode.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
copy to seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftConnectorErrorCode.java
index 11238b446..eeb716f8a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/exception/S3RedshiftConnectorErrorCode.java
@@ -15,23 +15,29 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.checkpoint;
+package org.apache.seatunnel.connectors.seatunnel.redshift.exception;
-public enum CheckpointCloseReason {
- PIPELINE_END("Pipeline turn to end state."),
- CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
- CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
- CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
- CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
- CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error.");
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
- private final String message;
+public enum S3RedshiftConnectorErrorCode implements SeaTunnelErrorCode {
+ AGGREGATE_COMMIT_ERROR("S3RedShift-01", "Aggregate committer error");
- CheckpointCloseReason(String message) {
- this.message = message;
+ private final String code;
+
+ private final String description;
+
+ S3RedshiftConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
}
- public String message() {
- return message;
+ @Override
+ public String getDescription() {
+ return this.description;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index 11238b446..a500eb19a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -23,7 +23,8 @@ public enum CheckpointCloseReason {
CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
- CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error.");
+ CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error."),
+ AGGREGATE_COMMIT_ERROR("Aggregate commit error.");
private final String message;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index 28be62f51..40569c987 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -17,8 +17,10 @@
package org.apache.seatunnel.engine.server.checkpoint.operation;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.Task;
@@ -34,8 +36,6 @@ import lombok.NoArgsConstructor;
import java.io.IOException;
-import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
-
@Getter
@NoArgsConstructor
public class CheckpointFinishedOperation extends TaskOperation {
@@ -94,13 +94,13 @@ public class CheckpointFinishedOperation extends TaskOperation {
}
Thread.currentThread().setContextClassLoader(classLoader);
} catch (Exception e) {
- sneakyThrow(e);
+ throw new SeaTunnelEngineException(ExceptionUtils.getMessage(e));
}
return null;
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
- false,
+ true,
exception ->
exception instanceof TaskGroupContextNotFoundException
&& !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index eed806cb0..9b21c0856 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -22,12 +22,16 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointException;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
+import org.apache.commons.collections4.CollectionUtils;
+
import com.hazelcast.cluster.Address;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -265,8 +269,12 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT>
aggregatedCommitInfo.addAll(value);
checkpointCommitInfoMap.remove(key);
});
- aggregatedCommitter.commit(aggregatedCommitInfo);
+ List<AggregatedCommitInfoT> commit = aggregatedCommitter.commit(aggregatedCommitInfo);
tryClose(checkpointId);
+ if (!CollectionUtils.isEmpty(commit)) {
+ log.error("aggregated committer error: {}", commit.size());
+ throw new CheckpointException(CheckpointCloseReason.AGGREGATE_COMMIT_ERROR);
+ }
}
@Override