You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jingsong Lee (Jira)" <ji...@apache.org> on 2019/12/30 09:21:00 UTC

[jira] [Closed] (FLINK-15357) schema created by JsonRowSchemaConverter are not suitable for TableEnv.sqlQuery table schema

     [ https://issues.apache.org/jira/browse/FLINK-15357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jingsong Lee closed FLINK-15357.
--------------------------------
    Resolution: Won't Fix

> schema created by JsonRowSchemaConverter are not suitable for TableEnv.sqlQuery table schema 
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15357
>                 URL: https://issues.apache.org/jira/browse/FLINK-15357
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common, Table SQL / API
>    Affects Versions: 1.9.1
>         Environment: You can reappear the bug by the following code 
> String sql = "SELECT count(*) as cnt, age, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), age";
>  StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
>  senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>  StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
>  DataStream<User> source = senv.addSource(new SourceFunction<User>() {
>  @Override
>  public void run(SourceContext<User> sourceContext) throws Exception {
>  int i = 1000;
>  String[] names = \{"Hanmeimei", "Lilei"};
>  while (i > 1) {
>  sourceContext.collect(new User(names[i%2], i, new Timestamp(System.currentTimeMillis())));
>  Thread.sleep(10);
>  i--;
>  }
>  }
>  @Override
>  public void cancel() {
>  }
>  });
>  tenv.registerDataStream("abc", source, "name, age, timestamp, rowtime.rowtime");
>  Table table = tenv.sqlQuery(sql);
>  List<Host> hosts = Arrays.asList(new Host("10.20.128.210", 19201, "http"));
>  TypeInformation<Row> typeInformation = JsonRowSchemaConverter.convert("{" +
>  " type:'object'," +
>  " properties:{" +
>  " cnt: {" +
>  " type: 'number'" +
>  " }," +
>  " tumTime:{" +
>  " type:'string'," +
>  " format:'date-time'" +
>  " }" +
>  " }" +
>  "}");
>  RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
>  TypeInformation<?>[] typeInformations = typeInfo.getFieldTypes();
>  String[] fieldNames = typeInfo.getFieldNames();
>  TableSchema.Builder builder = TableSchema.builder();
>  for (int i = 0; i < typeInformations.length; i ++) {
>  builder.field(fieldNames[i], typeInformations[i]);
>  }
>  Elasticsearch6UpsertTableSink establesink = new Elasticsearch6UpsertTableSink(
>  true,
>  builder.build(),
>  hosts,
>  "aggregation",
>  "data",
>  "$",
>  "n/a",
>  new JsonRowSerializationSchema.Builder(typeInformation).build(),
>  XContentType.JSON,
>  new IgnoringFailureHandler(),
>  new HashMap<>()
>  );
>  tenv.registerTableSink("aggregationTableSink", establesink);
>  table.insertInto("aggregationTableSink");
> }
> @Data
> @AllArgsConstructor
> @NoArgsConstructor
> public static class User {
>  private String name;
>  private Integer age;
>  private Timestamp timestamp;
> }
>            Reporter: 巫旭阳
>            Priority: Major
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Use JsonRowSchemaConverter.convert(jsonString) create schema TypeInfo area only sport bigdecimal DataType of number , but the Table created by usingTableEnvironmentImpl.sqlQuer(sqlString) may has a lot of number DataTypes such as Long, Integer。
> when program run it will throw an exception like below:
> {color:#FF0000}Field types of query result and registered TableSink [XXX] do not match.{color}
> {color:#FF0000}Query result schema: [cnt: Long, tumTime: Timestamp]{color}
> {color:#FF0000}TableSink schema: [cnt: BigDecimal, tumTime: Timestamp]{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)