You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Rockey Cui (Jira)" <ji...@apache.org> on 2019/12/12 03:43:00 UTC
[jira] [Created] (FLINK-15212) PROCTIME attribute causes problems
with timestamp times before 1900 ?
Rockey Cui created FLINK-15212:
----------------------------------
Summary: PROCTIME attribute causes problems with timestamp times before 1900 ?
Key: FLINK-15212
URL: https://issues.apache.org/jira/browse/FLINK-15212
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.9.1
Environment: flink 1.9.1
jdk1.8.0_211
idea2019.3
Reporter: Rockey Cui
A simple DataStreamSource with timestamp registered as a table.
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStreamSource<String> stringDataStreamSource = env.fromElements(
"1001,1002,adc0,1900-01-01 00:00:00.0",
"1002,1003,adc1,1910-01-01 00:00:00.0",
"1003,1004,adc2,1920-01-01 00:00:00.0",
"1004,1005,adc3,1930-01-01 00:00:00.0",
"1005,1006,adc4,1970-01-01 00:00:00.0",
"8888,6666,adc5,1971-01-01 00:00:00.0"
);
TypeInformation<?>[] fieldTypes = new TypeInformation[]{Types.LONG, Types.LONG, Types.STRING, Types.SQL_TIM
String[] fieldNames = new String[]{"id", "cityId", "url", "clickTime"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
DataStream<Row> stream = stringDataStreamSource.map((MapFunction<String, Row>) s -> {
String[] split = s.split(",");
Row row = new Row(split.length);
for (int i = 0; i < split.length; i++) {
Object value = null;
if (fieldTypes[i].equals(Types.STRING)) {
value = split[i];
}
if (fieldTypes[i].equals(Types.LONG)) {
value = Long.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.INT)) {
value = Integer.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.DOUBLE)) {
value = Double.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.SQL_TIMESTAMP)) {
value = Timestamp.valueOf(split[i]);
}
row.setField(i, value);
}
//System.out.println(row.toString());
return row;
}).returns(rowTypeInfo);
tableEnv.registerDataStream("user_click_info", stream, String.join(",", fieldNames) + ",www.proctime");
String sql = "select * from user_click_info";
Table table = tableEnv.sqlQuery(sql);
DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
result.print();
table.printSchema();
tableEnv.execute("Test");
{code}
result ==>
root
|-- id: BIGINT
|-- cityId: BIGINT
|-- url: STRING
|-- clickTime: TIMESTAMP(3)
|-- www: TIMESTAMP(3) *PROCTIME*
1001,1002,adc0,{color:#FF0000}1899-12-31 23:54:17.0{color},2019-12-12 03:37:18.036
1002,1003,adc1,1910-01-01 00:00:00.0,2019-12-12 03:37:18.196
1003,1004,adc2,1920-01-01 00:00:00.0,2019-12-12 03:37:18.196
1004,1005,adc3,1930-01-01 00:00:00.0,2019-12-12 03:37:18.196
1005,1006,adc4,1970-01-01 00:00:00.0,2019-12-12 03:37:18.196
8888,6666,adc5,1971-01-01 00:00:00.0,2019-12-12 03:37:18.196
----
without PROCTIME attribute is OK ==>
root
|-- id: BIGINT
|-- cityId: BIGINT
|-- url: STRING
|-- clickTime: TIMESTAMP(3)
1001,1002,adc0,1900-01-01 00:00:00.0
1002,1003,adc1,1910-01-01 00:00:00.0
1003,1004,adc2,1920-01-01 00:00:00.0
1004,1005,adc3,1930-01-01 00:00:00.0
1005,1006,adc4,1970-01-01 00:00:00.0
8888,6666,adc5,1971-01-01 00:00:00.0
--
This message was sent by Atlassian Jira
(v8.3.4#803005)