You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/02/25 07:43:43 UTC

[GitHub] [incubator-seatunnel] kalencaya commented on issue #1324: [Bug] [seatunnel-connector-flink-file] can't sync jdbc type eg date、datetime column by flink JdbcSource and FileSink

kalencaya commented on issue #1324:
URL: https://github.com/apache/incubator-seatunnel/issues/1324#issuecomment-1050611529


   I had found out why FileSink can't output some date columns produced by JdbcSource.
   
   JdbcSource maps date jdbc type to java.sql.Date and so on automatically, then FileSource uses JsonRowOutputFormat output data to file.
   Unfortunately, JsonRowOutputFormat ignores java.sql.Date、java.sql.Time or java.sql.Timestamp for SqlTimeTypeInfo not behaves as a basic type but a atomic type.
   I think changing if condition from `type.isBasicType()` to `type instanceof AtomicType` is a good idea.
   
   ```
   public class JsonRowOutputFormat extends FileOutputFormat<Row> {
   
       @Override
       public void writeRecord(Row record) throws IOException {
           final JSONObject json = getJson(record, rowTypeInfo);
           byte[] bytes = json.toString().getBytes(charset);
           this.stream.write(bytes);
           this.stream.write(NEWLINE);
       }
   
       private JSONObject getJson(Row record, RowTypeInfo rowTypeInfo) {
           String[] fieldNames = rowTypeInfo.getFieldNames();
           int i = 0;
           JSONObject json = new JSONObject();
           for (String name : fieldNames) {
               Object field = record.getField(i);
               final TypeInformation type = rowTypeInfo.getTypeAt(i);
               // SqlTimeTypeInfo is not a basic type
               if (type.isBasicType())) {
                   json.put(name, field);
               } else if (type instanceof ObjectArrayTypeInfo) {
                   ObjectArrayTypeInfo arrayTypeInfo = (ObjectArrayTypeInfo) type;
                   TypeInformation componentInfo = arrayTypeInfo.getComponentInfo();
                   JSONArray jsonArray = new JSONArray();
                   if (componentInfo instanceof RowTypeInfo) {
                       final Row[] rows = (Row[]) field;
                       for (Row r : rows) {
                           jsonArray.add(getJson(r, (RowTypeInfo) componentInfo));
                       }
                   } else {
                       jsonArray.addAll(Arrays.asList((Object[]) field));
                   }
                   json.put(name, jsonArray);
               } else if (type instanceof RowTypeInfo) {
                   RowTypeInfo typeInfo = (RowTypeInfo) type;
                   json.put(name, getJson((Row) field, typeInfo));
               }
               i++;
           }
           return json;
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org