You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Rockey Cui (Jira)" <ji...@apache.org> on 2019/12/12 03:43:00 UTC

[jira] [Created] (FLINK-15212) PROCTIME attribute causes problems with timestamp times before 1900 ?

Rockey Cui created FLINK-15212:
----------------------------------

             Summary:  PROCTIME attribute causes problems with timestamp times before 1900 ?
                 Key: FLINK-15212
                 URL: https://issues.apache.org/jira/browse/FLINK-15212
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.9.1
         Environment: flink 1.9.1

jdk1.8.0_211

idea2019.3
            Reporter: Rockey Cui


A simple DataStreamSource with timestamp registered as a table.

 
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStreamSource<String> stringDataStreamSource = env.fromElements(
        "1001,1002,adc0,1900-01-01 00:00:00.0",
        "1002,1003,adc1,1910-01-01 00:00:00.0",
        "1003,1004,adc2,1920-01-01 00:00:00.0",
        "1004,1005,adc3,1930-01-01 00:00:00.0",
        "1005,1006,adc4,1970-01-01 00:00:00.0",
        "8888,6666,adc5,1971-01-01 00:00:00.0"
);
TypeInformation<?>[] fieldTypes = new TypeInformation[]{Types.LONG, Types.LONG, Types.STRING, Types.SQL_TIM
String[] fieldNames = new String[]{"id", "cityId", "url", "clickTime"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
DataStream<Row> stream = stringDataStreamSource.map((MapFunction<String, Row>) s -> {
    String[] split = s.split(",");
    Row row = new Row(split.length);
    for (int i = 0; i < split.length; i++) {
        Object value = null;
        if (fieldTypes[i].equals(Types.STRING)) {
            value = split[i];
        }
        if (fieldTypes[i].equals(Types.LONG)) {
            value = Long.valueOf(split[i]);
        }
        if (fieldTypes[i].equals(Types.INT)) {
            value = Integer.valueOf(split[i]);
        }
        if (fieldTypes[i].equals(Types.DOUBLE)) {
            value = Double.valueOf(split[i]);
        }
        if (fieldTypes[i].equals(Types.SQL_TIMESTAMP)) {
            value = Timestamp.valueOf(split[i]);
        }
        row.setField(i, value);
    }
    //System.out.println(row.toString());
    return row;
}).returns(rowTypeInfo);
tableEnv.registerDataStream("user_click_info", stream, String.join(",", fieldNames) + ",www.proctime");
String sql = "select * from user_click_info";
Table table = tableEnv.sqlQuery(sql);
DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
result.print();
table.printSchema();
tableEnv.execute("Test");
{code}
result ==>

 

root
 |-- id: BIGINT
 |-- cityId: BIGINT
 |-- url: STRING
 |-- clickTime: TIMESTAMP(3)
 |-- www: TIMESTAMP(3) *PROCTIME*

 

1001,1002,adc0,{color:#FF0000}1899-12-31 23:54:17.0{color},2019-12-12 03:37:18.036
1002,1003,adc1,1910-01-01 00:00:00.0,2019-12-12 03:37:18.196
1003,1004,adc2,1920-01-01 00:00:00.0,2019-12-12 03:37:18.196
1004,1005,adc3,1930-01-01 00:00:00.0,2019-12-12 03:37:18.196
1005,1006,adc4,1970-01-01 00:00:00.0,2019-12-12 03:37:18.196
8888,6666,adc5,1971-01-01 00:00:00.0,2019-12-12 03:37:18.196
----
without  PROCTIME attribute is OK ==>

 

root
 |-- id: BIGINT
 |-- cityId: BIGINT
 |-- url: STRING
 |-- clickTime: TIMESTAMP(3)

 

1001,1002,adc0,1900-01-01 00:00:00.0
1002,1003,adc1,1910-01-01 00:00:00.0
1003,1004,adc2,1920-01-01 00:00:00.0
1004,1005,adc3,1930-01-01 00:00:00.0
1005,1006,adc4,1970-01-01 00:00:00.0
8888,6666,adc5,1971-01-01 00:00:00.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)