You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/09 10:44:54 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2976: [Improve][Connector-V2] Support user-defined schema for reading text file

TyrantLucifer commented on code in PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976#discussion_r990771445


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java:
##########
@@ -35,22 +41,65 @@
 import java.nio.charset.StandardCharsets;
 
 public class TextReadStrategy extends AbstractReadStrategy {
-
-    private static final String TEXT_FIELD_NAME = "lines";
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private String fieldDelimiter = String.valueOf('\001');
+    private DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
+    private DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+    private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
 
     @Override
     public void read(String path, Collector<SeaTunnelRow> output) throws IOException, FilePluginException {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
-            reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new String[]{line})));
+            reader.lines().forEach(line -> {
+                try {
+                    deserializationSchema.deserialize(line.getBytes(), output);
+                } catch (IOException e) {
+                    String errorMsg = String.format("Deserialize this data [%s] error, please check the origin data", line);
+                    throw new RuntimeException(errorMsg);
+                }
+            });
         }
     }
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
-        return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
-                new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+        SeaTunnelRowType simpleSeaTunnelType = SeaTunnelSchema.buildSimpleTextSchema();
+        deserializationSchema = TextDeserializationSchema.builder()
+                .seaTunnelRowType(simpleSeaTunnelType)
+                .delimiter(String.valueOf('\002'))

Review Comment:
   If user do not assign schema information in config file, connector will not split for every line in target file. For example, if file connector read text file that generated by hive and setting delimiter to `\001` if will split the whole line. So in order to do not split the line content I chosed to set delimter `\002`. `\002` is not a common delimiter for text file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org