You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/05 20:00:08 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer opened a new pull request, #2976: [Improve][Connector-V2] Support user-defined schema for text file

TyrantLucifer opened a new pull request, #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   close #2952 
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2976: [Improve][Connector-V2] Support user-defined schema for reading text file

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976#discussion_r990771445


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java:
##########
@@ -35,22 +41,65 @@
 import java.nio.charset.StandardCharsets;
 
 public class TextReadStrategy extends AbstractReadStrategy {
-
-    private static final String TEXT_FIELD_NAME = "lines";
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private String fieldDelimiter = String.valueOf('\001');
+    private DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
+    private DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+    private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
 
     @Override
     public void read(String path, Collector<SeaTunnelRow> output) throws IOException, FilePluginException {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
-            reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new String[]{line})));
+            reader.lines().forEach(line -> {
+                try {
+                    deserializationSchema.deserialize(line.getBytes(), output);
+                } catch (IOException e) {
+                    String errorMsg = String.format("Deserialize this data [%s] error, please check the origin data", line);
+                    throw new RuntimeException(errorMsg);
+                }
+            });
         }
     }
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
-        return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
-                new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+        SeaTunnelRowType simpleSeaTunnelType = SeaTunnelSchema.buildSimpleTextSchema();
+        deserializationSchema = TextDeserializationSchema.builder()
+                .seaTunnelRowType(simpleSeaTunnelType)
+                .delimiter(String.valueOf('\002'))

Review Comment:
   If user do not assign schema information in config file, connector will not split for every line in target file. For example, if file connector read text file that generated by hive and setting delimiter to `\001` if will split the whole line. So in order to do not split the line content I chosed to set delimter `\002`. `\002` is not a common delimiter for text file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2976: [Improve][Connector-V2] Support user-defined schema for text file

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976#discussion_r985649294


##########
docs/en/connector-v2/source/FtpFile.md:
##########
@@ -51,6 +55,38 @@ The target ftp password is required
 
 The source file path.
 
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files

Review Comment:
   Done, please review again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2976: [Improve][Connector-V2] Support user-defined schema for text file

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976#discussion_r985636867


##########
docs/en/connector-v2/source/FtpFile.md:
##########
@@ -51,6 +55,38 @@ The target ftp password is required
 
 The source file path.
 
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files

Review Comment:
   Add '\001' code description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin merged pull request #2976: [Improve][Connector-V2] Support user-defined schema for reading text file

Posted by GitBox <gi...@apache.org>.
ashulin merged PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #2976: [Improve][Connector-V2] Support user-defined schema for reading text file

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976#issuecomment-1272469125

   > Overall good, can you add the change log to the corresponding document
   
   This job I will do in next pr, in next pr I will add all the change log of the connectors that developed by me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2976: [Improve][Connector-V2] Support user-defined schema for reading text file

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976#discussion_r990762303


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java:
##########
@@ -35,22 +41,65 @@
 import java.nio.charset.StandardCharsets;
 
 public class TextReadStrategy extends AbstractReadStrategy {
-
-    private static final String TEXT_FIELD_NAME = "lines";
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private String fieldDelimiter = String.valueOf('\001');
+    private DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
+    private DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+    private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
 
     @Override
     public void read(String path, Collector<SeaTunnelRow> output) throws IOException, FilePluginException {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
-            reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new String[]{line})));
+            reader.lines().forEach(line -> {
+                try {
+                    deserializationSchema.deserialize(line.getBytes(), output);
+                } catch (IOException e) {
+                    String errorMsg = String.format("Deserialize this data [%s] error, please check the origin data", line);
+                    throw new RuntimeException(errorMsg);
+                }
+            });
         }
     }
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
-        return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
-                new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+        SeaTunnelRowType simpleSeaTunnelType = SeaTunnelSchema.buildSimpleTextSchema();
+        deserializationSchema = TextDeserializationSchema.builder()
+                .seaTunnelRowType(simpleSeaTunnelType)
+                .delimiter(String.valueOf('\002'))

Review Comment:
   Default delimiter should be `\001`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2976: [Improve][Connector-V2] Support user-defined schema for reading text file

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976#issuecomment-1272468412

   Overall good, can you add the change log to the corresponding document


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org