You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/09 09:37:07 UTC

[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2976: [Improve][Connector-V2] Support user-defined schema for reading text file

Hisoka-X commented on code in PR #2976:
URL: https://github.com/apache/incubator-seatunnel/pull/2976#discussion_r990762303


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java:
##########
@@ -35,22 +41,65 @@
 import java.nio.charset.StandardCharsets;
 
 public class TextReadStrategy extends AbstractReadStrategy {
-
-    private static final String TEXT_FIELD_NAME = "lines";
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private String fieldDelimiter = String.valueOf('\001');
+    private DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
+    private DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+    private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
 
     @Override
     public void read(String path, Collector<SeaTunnelRow> output) throws IOException, FilePluginException {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
-            reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new String[]{line})));
+            reader.lines().forEach(line -> {
+                try {
+                    deserializationSchema.deserialize(line.getBytes(), output);
+                } catch (IOException e) {
+                    String errorMsg = String.format("Deserialize this data [%s] error, please check the origin data", line);
+                    throw new RuntimeException(errorMsg);
+                }
+            });
         }
     }
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
-        return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
-                new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+        SeaTunnelRowType simpleSeaTunnelType = SeaTunnelSchema.buildSimpleTextSchema();
+        deserializationSchema = TextDeserializationSchema.builder()
+                .seaTunnelRowType(simpleSeaTunnelType)
+                .delimiter(String.valueOf('\002'))

Review Comment:
   Default delimiter should be `\001`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org