You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "巫旭阳 (Jira)" <ji...@apache.org> on 2019/12/21 12:19:00 UTC

[jira] [Created] (FLINK-15357) schema created by JsonRowSchemaConverter are not suitable for TableEnv.sqlQuery table schema

巫旭阳 created FLINK-15357:
---------------------------

             Summary: schema created by JsonRowSchemaConverter are not suitable for TableEnv.sqlQuery table schema 
                 Key: FLINK-15357
                 URL: https://issues.apache.org/jira/browse/FLINK-15357
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Common, Table SQL / API
    Affects Versions: 1.9.1
         Environment: You can reappear the bug by the following code 

String sql = "SELECT count(*) as cnt, age, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), age";
 StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
 DataStream<User> source = senv.addSource(new SourceFunction<User>() {
 @Override
 public void run(SourceContext<User> sourceContext) throws Exception {
 int i = 1000;
 String[] names = \{"Hanmeimei", "Lilei"};
 while (i > 1) {
 sourceContext.collect(new User(names[i%2], i, new Timestamp(System.currentTimeMillis())));
 Thread.sleep(10);
 i--;
 }
 }
 @Override
 public void cancel() {

 }
 });
 tenv.registerDataStream("abc", source, "name, age, timestamp, rowtime.rowtime");
 Table table = tenv.sqlQuery(sql);
 List<Host> hosts = Arrays.asList(new Host("10.20.128.210", 19201, "http"));
 TypeInformation<Row> typeInformation = JsonRowSchemaConverter.convert("{" +
 " type:'object'," +
 " properties:{" +
 " cnt: {" +
 " type: 'number'" +
 " }," +
 " tumTime:{" +
 " type:'string'," +
 " format:'date-time'" +
 " }" +
 " }" +
 "}");
 RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
 TypeInformation<?>[] typeInformations = typeInfo.getFieldTypes();

 String[] fieldNames = typeInfo.getFieldNames();
 TableSchema.Builder builder = TableSchema.builder();
 for (int i = 0; i < typeInformations.length; i ++) {
 builder.field(fieldNames[i], typeInformations[i]);
 }
 Elasticsearch6UpsertTableSink establesink = new Elasticsearch6UpsertTableSink(
 true,
 builder.build(),
 hosts,
 "aggregation",
 "data",
 "$",
 "n/a",
 new JsonRowSerializationSchema.Builder(typeInformation).build(),
 XContentType.JSON,
 new IgnoringFailureHandler(),
 new HashMap<>()
 );
 tenv.registerTableSink("aggregationTableSink", establesink);
 table.insertInto("aggregationTableSink");
}


@Data
@AllArgsConstructor
@NoArgsConstructor
public static class User {
 private String name;

 private Integer age;

 private Timestamp timestamp;
}
            Reporter: 巫旭阳
             Fix For: 1.9.2, 1.10.0


Use JsonRowSchemaConverter.convert(jsonString) create schema TypeInfo area only sport bigdecimal DataType of number , but the Table created by usingTableEnvironmentImpl.sqlQuer(sqlString) may has a lot of number DataTypes such as Long, Integer。
when program run it will throw an exception like below:
{color:#FF0000}Field types of query result and registered TableSink [XXX] do not match.{color}
{color:#FF0000}Query result schema: [cnt: Long, tumTime: Timestamp]{color}
{color:#FF0000}TableSink schema: [cnt: BigDecimal, tumTime: Timestamp]{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)