You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/08 10:08:13 UTC

[incubator-seatunnel] branch dev updated: [Bug][Connector-V2][File] Fix the bug of incorrect path in windows environment (#2980)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2e1616186 [Bug][Connector-V2][File] Fix the bug of incorrect path in windows environment (#2980)
2e1616186 is described below

commit 2e16161865a8374b7b5a3257539b848480af969d
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Sat Oct 8 18:08:08 2022 +0800

    [Bug][Connector-V2][File] Fix the bug of incorrect path in windows environment (#2980)
    
    * [Bug][Connector-V2] Fix the bug of incorrect path in windows environment
    
    * [Bug][Connector-V2] Add unit test for FileSystemUtils
    
    * [Bug][Connector-V2] Optimize file system utils
    
    * [Bug][Connector-V2] Replace '/' to 'File.separator'
    
    * [Bug][Connector-V2] Fix code style
    
    * [Bug][Connector-V2] Fix test cases
    
    * [Improve][Connector-V2] Fix test case
    
    * [Bug][Connector-V2] Fix test cases
    
    * [Bug][Connector-V2] Remove test case
---
 .../connectors/seatunnel/file/sink/util/FileSystemUtils.java   | 10 +++++-----
 .../seatunnel/file/sink/writer/AbstractWriteStrategy.java      | 10 ++++++----
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index c37874b12..9354130bb 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -39,7 +39,7 @@ public class FileSystemUtils {
     public static Configuration CONF;
 
     public static FileSystem getFileSystem(@NonNull String path) throws IOException {
-        FileSystem fileSystem = FileSystem.get(URI.create(path), CONF);
+        FileSystem fileSystem = FileSystem.get(new File(path).toURI(), CONF);
         fileSystem.setWriteChecksum(false);
         return fileSystem;
     }
@@ -88,8 +88,8 @@ public class FileSystemUtils {
                 log.info("Delete already file: {}", newPath);
             }
         }
-        if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
-            createDir(newName.substring(0, newName.lastIndexOf("/")));
+        if (!fileExist(newPath.getParent().toString())) {
+            createDir(newPath.getParent().toString());
         }
 
         if (fileSystem.rename(oldPath, newPath)) {
@@ -118,7 +118,7 @@ public class FileSystemUtils {
      */
     public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
         FileSystem fileSystem = getFileSystem(filePath);
-        List<Path> pathList = new ArrayList<Path>();
+        List<Path> pathList = new ArrayList<>();
         Path fileName = new Path(filePath);
         FileStatus[] status = fileSystem.listStatus(fileName);
         if (status != null && status.length > 0) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 1bdfefa10..2548ea92a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 
 public abstract class AbstractWriteStrategy implements WriteStrategy {
@@ -151,7 +152,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
                 stringBuilder.append(partitionFieldList.get(i))
                         .append("=")
                         .append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
-                        .append("/");
+                        .append(File.separator);
                 vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
             }
             partitionDir = stringBuilder.toString();
@@ -246,7 +247,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
      */
     public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
         String[] pathSegments = new String[]{textFileSinkConfig.getPath(), Constant.SEATUNNEL, jobId};
-        String jobDir = String.join(File.separator, pathSegments) + "/";
+        String jobDir = String.join(File.separator, pathSegments) + File.separator;
         try {
             List<String> transactionDirList = FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
             return transactionDirList.stream().map(dir -> dir.replaceAll(jobDir, "")).collect(Collectors.toList());
@@ -298,7 +299,8 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
     }
 
     public String getTargetLocation(@NonNull String seaTunnelFilePath) {
-        String tmpPath = seaTunnelFilePath.replaceAll(transactionDirectory, textFileSinkConfig.getPath());
-        return tmpPath.replaceAll(Constant.NON_PARTITION + File.separator, "");
+        String tmpPath = seaTunnelFilePath.replaceAll(Matcher.quoteReplacement(transactionDirectory),
+                Matcher.quoteReplacement(textFileSinkConfig.getPath()));
+        return tmpPath.replaceAll(Constant.NON_PARTITION + Matcher.quoteReplacement(File.separator), "");
     }
 }