You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/08 10:08:13 UTC
[incubator-seatunnel] branch dev updated: [Bug][Connector-V2][File] Fix the bug of incorrect path in windows environment (#2980)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2e1616186 [Bug][Connector-V2][File] Fix the bug of incorrect path in windows environment (#2980)
2e1616186 is described below
commit 2e16161865a8374b7b5a3257539b848480af969d
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Sat Oct 8 18:08:08 2022 +0800
[Bug][Connector-V2][File] Fix the bug of incorrect path in windows environment (#2980)
* [Bug][Connector-V2] Fix the bug of incorrect path in windows environment
* [Bug][Connector-V2] Add unit test for FileSystemUtils
* [Bug][Connector-V2] Optimize file system utils
* [Bug][Connector-V2] Replace '/' to 'File.separator'
* [Bug][Connector-V2] Fix code style
* [Bug][Connector-V2] Fix test cases
* [Improve][Connector-V2] Fix test case
* [Bug][Connector-V2] Fix test cases
* [Bug][Connector-V2] Remove test case
---
.../connectors/seatunnel/file/sink/util/FileSystemUtils.java | 10 +++++-----
.../seatunnel/file/sink/writer/AbstractWriteStrategy.java | 10 ++++++----
2 files changed, 11 insertions(+), 9 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index c37874b12..9354130bb 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -39,7 +39,7 @@ public class FileSystemUtils {
public static Configuration CONF;
public static FileSystem getFileSystem(@NonNull String path) throws IOException {
- FileSystem fileSystem = FileSystem.get(URI.create(path), CONF);
+ FileSystem fileSystem = FileSystem.get(new File(path).toURI(), CONF);
fileSystem.setWriteChecksum(false);
return fileSystem;
}
@@ -88,8 +88,8 @@ public class FileSystemUtils {
log.info("Delete already file: {}", newPath);
}
}
- if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
- createDir(newName.substring(0, newName.lastIndexOf("/")));
+ if (!fileExist(newPath.getParent().toString())) {
+ createDir(newPath.getParent().toString());
}
if (fileSystem.rename(oldPath, newPath)) {
@@ -118,7 +118,7 @@ public class FileSystemUtils {
*/
public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
FileSystem fileSystem = getFileSystem(filePath);
- List<Path> pathList = new ArrayList<Path>();
+ List<Path> pathList = new ArrayList<>();
Path fileName = new Path(filePath);
FileStatus[] status = fileSystem.listStatus(fileName);
if (status != null && status.length > 0) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 1bdfefa10..2548ea92a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.regex.Matcher;
import java.util.stream.Collectors;
public abstract class AbstractWriteStrategy implements WriteStrategy {
@@ -151,7 +152,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
stringBuilder.append(partitionFieldList.get(i))
.append("=")
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
- .append("/");
+ .append(File.separator);
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
}
partitionDir = stringBuilder.toString();
@@ -246,7 +247,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
*/
public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
String[] pathSegments = new String[]{textFileSinkConfig.getPath(), Constant.SEATUNNEL, jobId};
- String jobDir = String.join(File.separator, pathSegments) + "/";
+ String jobDir = String.join(File.separator, pathSegments) + File.separator;
try {
List<String> transactionDirList = FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
return transactionDirList.stream().map(dir -> dir.replaceAll(jobDir, "")).collect(Collectors.toList());
@@ -298,7 +299,8 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
}
public String getTargetLocation(@NonNull String seaTunnelFilePath) {
- String tmpPath = seaTunnelFilePath.replaceAll(transactionDirectory, textFileSinkConfig.getPath());
- return tmpPath.replaceAll(Constant.NON_PARTITION + File.separator, "");
+ String tmpPath = seaTunnelFilePath.replaceAll(Matcher.quoteReplacement(transactionDirectory),
+ Matcher.quoteReplacement(textFileSinkConfig.getPath()));
+ return tmpPath.replaceAll(Constant.NON_PARTITION + Matcher.quoteReplacement(File.separator), "");
}
}