You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "aven.wu" <da...@163.com> on 2019/12/17 13:20:01 UTC
[SQL] [TableAPI] Table.sqlQuery(sql) 和 tableSink 的 table schema 类型不匹配
Hi!
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [aggregationTableSink] do not match.
SQL = SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND)
使用table.sqlQuery(SQL),返回的table schema 是 Query result schema: [cnt: Long, tumTime: Timestamp]。
而使用
JsonRowSchemaConverter.convert("{" +
" type:'object'," +
" properties:{" +
" cnt: {" +
" type: 'number'" +
" }," +
" tumTime:{" +
" type:'string'," +
" format:'date-time'" +
" }" +
" }" +
“}");
创建Elasticsearch6UpsertTableSink table schema 是 TableSink schema: [cnt: BigDecimal, tumTime: Timestamp]
而且我看了 JsonRowSchemaConverter.convert 所有的数字类型都被转成BigDecimal,导致SQL返回的schema 和 json定义的schema无法匹配。
请问是我使用的问题还是说框架存在这个问题?
附上源代码:
public class AggregationFunction {
public static void main(String[] args) {
String sql = "SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND)";
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
DataStream<User> source = senv.addSource(new SourceFunction<User>() {
@Override
public void run(SourceContext<User> sourceContext) throws Exception {
int i = 1000;
String[] names = {"Hanmeimei", "Lilei"};
while (i > 1) {
sourceContext.collect(new User(names[i%2], i, new Timestamp(System.currentTimeMillis())));
Thread.sleep(10);
i--;
}
}
@Override
public void cancel() {
}
});
tenv.registerDataStream("abc", source, "name, age, timestamp, rowtime.rowtime");
Table table = tenv.sqlQuery(sql);
List<Host> hosts = Arrays.asList(new Host("10.20.128.210", 19201, "http"));
TypeInformation<Row> typeInformation = JsonRowSchemaConverter.convert("{" +
" type:'object'," +
" properties:{" +
" cnt: {" +
" type: 'number'" +
" }," +
" tumTime:{" +
" type:'string'," +
" format:'date-time'" +
" }" +
" }" +
"}");
RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
TypeInformation<?>[] typeInformations = typeInfo.getFieldTypes();
String[] fieldNames = typeInfo.getFieldNames();
TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < typeInformations.length; i ++) {
builder.field(fieldNames[i], typeInformations[i]);
}
Elasticsearch6UpsertTableSink establesink = new Elasticsearch6UpsertTableSink(
true,
builder.build(),
hosts,
"aggregation",
"data",
"$",
"n/a",
new JsonRowSerializationSchema.Builder(typeInformation).build(),
XContentType.JSON,
new IgnoringFailureHandler(),
new HashMap<>()
);
tenv.registerTableSink("aggregationTableSink", establesink);
table.insertInto("aggregationTableSink");
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class User {
private String name;
private Integer age;
private Timestamp timestamp;
}
}
best wish!