You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/01 12:55:26 UTC
[GitHub] [flink] twalthr commented on a change in pull request #17396: [FLINK-24393][test] Add CAST tests for type combinations
twalthr commented on a change in pull request #17396:
URL: https://github.com/apache/flink/pull/17396#discussion_r720176453
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
Review comment:
nit: trailing zero is remove by Java already
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(INTERVAL(YEAR()), 84, "+7-00")
+ .testCase(INTERVAL(MONTH()), 5, "+0-05")
+ .testCase(INTERVAL(MONTH()), 123, "+10-03")
+ .testCase(INTERVAL(MONTH()), 12334, "+1027-10")
+ .testCase(INTERVAL(DAY()), 10, "+0 00:00:00.010")
+ .testCase(INTERVAL(DAY()), 123456789L, "+1 10:17:36.789")
+ .testCase(INTERVAL(DAY()), Duration.ofHours(36), "+1 12:00:00.000")
+ // https://issues.apache.org/jira/browse/FLINK-21456 Not supported currently
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BOOLEAN(), "To BOOLEAN")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(CHAR(4), "true", true)
+ .testCase(VARCHAR(5), "FalsE", false)
+ .testCase(STRING(), "Apache Flink", null)
+ .testCase(STRING(), "TRUE", true)
+ .testCase(STRING(), "", null)
+ .testCase(BOOLEAN(), true, true)
+ .testCase(BOOLEAN(), false, false)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, true)
+ .testCase(DECIMAL(2, 1), 0.0, false)
+ .testCase(TINYINT(), -125, true)
+ .testCase(TINYINT(), 0, false)
+ .testCase(SMALLINT(), 32767, true)
+ .testCase(SMALLINT(), 0, false)
+ .testCase(INT(), -12345678, true)
+ .testCase(INT(), 0, false)
+ .testCase(BIGINT(), 1234567891234L, true)
+ .testCase(BIGINT(), 0, false)
+ .testCase(FLOAT(), -123.456, true)
+ .testCase(FLOAT(), 0, false)
+ .testCase(DOUBLE(), 12345.67890, true)
+ .testCase(DOUBLE(), 0, false)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BINARY(2), "To BINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(VARBINARY(4), "To VARBINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(BYTES(), "To BYTES")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DECIMAL(4, 3), "To DECIMAL")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", new BigDecimal("1.234".toCharArray(), 0, 5))
+ .testCase(BOOLEAN(), true, new BigDecimal("1.000".toCharArray(), 0, 5))
+ .testCase(BOOLEAN(), false, new BigDecimal("0.000".toCharArray(), 0, 5))
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, new BigDecimal("9.870".toCharArray(), 0, 5))
+ .testCase(TINYINT(), -125, null)
+ .testCase(SMALLINT(), 32767, null)
+ .testCase(INT(), -12345678, null)
+ .testCase(BIGINT(), 1234567891234L, null)
+ .testCase(FLOAT(), -123.456, null)
+ .testCase(DOUBLE(), 12345.67890, null)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(TINYINT(), "To TINYINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", (byte) 1)
+ .testCase(STRING(), "123", (byte) 123)
+ .testCase(STRING(), "-130", null)
+ .testCase(BOOLEAN(), true, (byte) 1)
+ .testCase(BOOLEAN(), false, (byte) 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, (byte) 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(10, 3), 9123.87, (byte) -93)
+ .testCase(TINYINT(), -125, (byte) -125)
+ .testCase(SMALLINT(), 32, (byte) 32)
+ .testCase(SMALLINT(), 32767, (byte) -1)
+ .testCase(INT(), -12, (byte) -12)
+ .testCase(INT(), -12345678, (byte) -78)
+ .testCase(BIGINT(), 123, (byte) 123)
+ .testCase(BIGINT(), 1234567891234L, (byte) 34)
+ .testCase(FLOAT(), -123.456, (byte) -123)
+ .testCase(FLOAT(), 128.456, (byte) -128)
+ .testCase(DOUBLE(), 123.4567890, (byte) 123)
+ .testCase(DOUBLE(), 12345.67890, (byte) 57)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(SMALLINT(), "To SMALLINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", (short) 1)
+ .testCase(STRING(), "123", (short) 123)
+ .testCase(STRING(), "-32769", null)
+ .testCase(BOOLEAN(), true, (short) 1)
+ .testCase(BOOLEAN(), false, (short) 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, (short) 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(10, 3), 91235.87, (short) 25699)
+ .testCase(TINYINT(), -125, (short) -125)
+ .testCase(SMALLINT(), 32, (short) 32)
+ .testCase(SMALLINT(), 32780, (short) -32756)
+ .testCase(INT(), -12, (short) -12)
+ .testCase(INT(), -12345678, (short) -24910)
+ .testCase(BIGINT(), 123, (short) 123)
+ .testCase(BIGINT(), 1234567891234L, (short) 2338)
+ .testCase(FLOAT(), -123.456, (short) -123)
+ .testCase(FLOAT(), 123456.78, (short) -7616)
+ .testCase(DOUBLE(), 123.4567890, (short) 123)
+ .testCase(DOUBLE(), 123456.7890, (short) -7616)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(INT(), "To INT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1)
+ .testCase(STRING(), "123", 123)
+ .testCase(STRING(), "-3276913443134", null)
+ .testCase(BOOLEAN(), true, 1)
+ .testCase(BOOLEAN(), false, 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(20, 3), 3276913443134.87, -146603714)
+ .testCase(TINYINT(), -125, -125)
+ .testCase(SMALLINT(), 32, 32)
+ .testCase(INT(), -12345678, -12345678)
+ .testCase(BIGINT(), 123, 123)
+ .testCase(BIGINT(), 1234567891234L, 1912277282)
+ .testCase(FLOAT(), -123.456, -123)
+ .testCase(FLOAT(), 9234567891.12, 644633299)
+ .testCase(DOUBLE(), 123.4567890, 123)
+ .testCase(DOUBLE(), 9234567891.12345, 644633299)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BIGINT(), "To BIGINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1L)
+ .testCase(STRING(), "123", 123L)
+ .testCase(STRING(), "-3276913443134", -3276913443134L)
+ .testCase(BOOLEAN(), true, 1L)
+ .testCase(BOOLEAN(), false, 0L)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9L)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3276913443134L)
+ .testCase(TINYINT(), -125, -125L)
+ .testCase(SMALLINT(), 32, 32L)
+ .testCase(INT(), -12345678, -12345678L)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234L)
+ .testCase(FLOAT(), -123.456, -123L)
+ .testCase(FLOAT(), 9234567891.12, 9234567891L)
+ .testCase(DOUBLE(), 123.4567890, 123L)
+ .testCase(DOUBLE(), 9234567891.12345, 9234567891L)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(FLOAT(), "To FLOAT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1.234f)
+ .testCase(STRING(), "123", 123.0f)
+ .testCase(STRING(), "-3276913443134", -3.27691403E12f)
+ .testCase(BOOLEAN(), true, 1.0f)
+ .testCase(BOOLEAN(), false, 0.0f)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9.87f)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(4, 3), 9.87, 9.87f)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3.27691351E12f)
+ .testCase(TINYINT(), -125, -125f)
+ .testCase(SMALLINT(), 32, 32f)
+ .testCase(INT(), -12345678, -12345678f)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234f)
+ .testCase(FLOAT(), -123.456, -123.456f)
+ .testCase(FLOAT(), 9234567891.12, 9234567891.12f)
+ .testCase(DOUBLE(), 123.4567890, 123.45679f)
+ .testCase(DOUBLE(), 1239234567891.1234567891234, 1.23923451E12f)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DOUBLE(), "To DOUBLE")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1.234d)
+ .testCase(STRING(), "123", 123.0d)
+ .testCase(STRING(), "-3276913443134", -3.276913443134E12)
+ .testCase(BOOLEAN(), true, 1.0d)
+ .testCase(BOOLEAN(), false, 0.0d)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9.87d)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3.27691344313487E12d)
+ .testCase(TINYINT(), -125, -125d)
+ .testCase(SMALLINT(), 32, 32d)
+ .testCase(INT(), -12345678, -12345678d)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234d)
+ .testCase(FLOAT(), -123.456, -123.456d)
+ .testCase(FLOAT(), 9234567891.12, 9234567891.12d)
+ .testCase(DOUBLE(), 123.4567890, 123.456789d)
+ .testCase(DOUBLE(), 1239234567891.1234567891234, 1.2392345678911235E12d)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DATE(), "To DATE")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ // https://issues.apache.org/jira/browse/FLINK-24421 Bug
+ .testCase(STRING(), "123", LocalDate.of(123, 1, 1))
+ .testCase(STRING(), "2021-09-27", LocalDate.of(2021, 9, 27))
+ .testCase(STRING(), "2021/09/27", null)
+ // Not supported - no fix
+ // BOOLEAN
+ // BINARY
+ // VARBINARY
+ // BYTES
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), LocalDate.of(2021, 9, 24))
+ // Not supported - no fix
+ // TIME
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ // Not supported currently
+ // TIMESTAMP_WITH_TIME_ZONE
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24422 - Accept only Instant
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T22:34:56.123456")
+ .toInstant(ZoneOffset.ofHours(5)),
+ LocalDate.of(2021, 9, 25))
+ // Not supported - no fix
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(TIME(), "To TIME")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ // https://issues.apache.org/jira/browse/FLINK-24421 Bug
+ .testCase(STRING(), "123", LocalTime.of(23, 1, 1))
+ .testCase(STRING(), "2021-09-27", null)
+ // https://issues.apache.org/jira/browse/FLINK-24415 Fractional seconds are
+ // lost
+ .testCase(STRING(), "12:34:56.123456789", LocalTime.of(12, 34, 56, 0))
+ .testCase(STRING(), "2021-09-27 12:34:56.123456789", null)
+ // Not supported - no fix
+ // BOOLEAN
+ // BINARY
+ // VARBINARY
+ // BYTES
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ .testCase(
+ TIME(5),
+ LocalTime.parse("12:34:56.1234567"),
+ LocalTime.of(12, 34, 56, 0))
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalTime.of(12, 34, 56, 0))
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalTime.of(12, 34, 56, 0))
+ // Not supported currently
+ // TIMESTAMP_WITH_TIME_ZONE
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24422 - Accept only Instant
+ .testCase(
+ TIMESTAMP_LTZ(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalTime.of(12, 34, 56, 0))
+ .testCase(
+ TIMESTAMP_LTZ(4),
+ LocalDateTime.parse("2021-09-24T22:34:56.123456")
+ .toInstant(ZoneOffset.ofHours(5)),
+ LocalTime.of(1, 34, 56, 0))
+ // Not supported - no fix
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(TIMESTAMP(9), "To TIMESTAMP")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "123", null)
+ .testCase(STRING(), "2021-09-27", LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0))
+ .testCase(STRING(), "2021/09/27", null)
+ .testCase(
+ STRING(),
+ "2021-09-27 12:34:56.123456789",
+ LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123456789))
+ // Not supported - no fix
+ // BOOLEAN
+ // BINARY
+ // VARBINARY
+ // BYTES
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ .testCase(
+ DATE(),
+ LocalDate.parse("2021-09-24"),
+ LocalDateTime.of(2021, 9, 24, 0, 0, 0, 0))
+ // https://issues.apache.org/jira/browse/FLINK-24415 Fractional seconds are
+ // lost
+ // https://issues.apache.org/jira/browse/FLINK-24423 Continue using EPOCH
+ // date or use 0 for the year?
+ .testCase(
+ TIME(5),
+ LocalTime.parse("12:34:56.1234567"),
+ LocalDateTime.of(1970, 1, 1, 12, 34, 56, 0))
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDateTime.of(2021, 9, 24, 12, 34, 56, 123456000))
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDateTime.of(2021, 9, 24, 12, 34, 56, 123400000))
+ // Not supported currently
+ // TIMESTAMP_WITH_TIME_ZONE
+ // https://issues.apache.org/jira/browse/FLINK-24422 - Accept only Instant
+ .testCase(
+ TIMESTAMP_LTZ(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDateTime.of(2021, 9, 24, 12, 34, 56, 123400000))
+ .testCase(
+ TIMESTAMP_LTZ(4),
+ LocalDateTime.parse("2021-09-24T22:34:56.123456")
+ .toInstant(ZoneOffset.ofHours(5)),
+ LocalDateTime.of(2021, 9, 25, 1, 34, 56, 123400000))
+ // Not supported - no fix
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(TIMESTAMP_LTZ(9), "To TIMESTAMP_LTZ")
+ // Causes NPE
+ // .testCase(CHAR(3), "foo", null)
+ // .testCase(VARCHAR(5), "Flink", null)
+ // .testCase(STRING(), "123", null)
+ .testCase(
+ STRING(),
+ "2021-09-27",
+ fromLocalToUTC(LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0)))
+ // https://issues.apache.org/jira/browse/FLINK-24415 Fractional seconds are
+ // lost
+ .testCase(
Review comment:
this is not related to FLINK-24415, we are using TIMESTAMP not TIME here
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
##########
@@ -80,10 +80,13 @@
@Parameter public TestSpec testSpec;
+ protected TableEnvironment env() {
Review comment:
It would be good to not give tests full access to the entire API. Once such a member is visible, unexperienced contributors will use it for things that are not intended. This would make it impossible to make the tests more efficient in the future. Instead, we should only expose what it really needed (e.g. functions, or configuration).
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
##########
@@ -80,10 +80,13 @@
@Parameter public TestSpec testSpec;
+ protected TableEnvironment env() {
Review comment:
let's only add `getConfiguration()` as a method for now
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
Review comment:
TIMESTAMP_LTZ should apply the timezone in the string, a timezone id/offset info is not required
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
Review comment:
I think we don't need the second string parameter, it is always equal to the first parameter. We could rename the method to make the code more readable: `testCastTo(CHAR(3)).fromCase(...).build()`
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
Review comment:
the commented code does not make much sense, let's just write `TIMESTAMP_WITH_TIME_ZONE` here without code
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(INTERVAL(YEAR()), 84, "+7-00")
+ .testCase(INTERVAL(MONTH()), 5, "+0-05")
+ .testCase(INTERVAL(MONTH()), 123, "+10-03")
+ .testCase(INTERVAL(MONTH()), 12334, "+1027-10")
+ .testCase(INTERVAL(DAY()), 10, "+0 00:00:00.010")
+ .testCase(INTERVAL(DAY()), 123456789L, "+1 10:17:36.789")
+ .testCase(INTERVAL(DAY()), Duration.ofHours(36), "+1 12:00:00.000")
+ // https://issues.apache.org/jira/browse/FLINK-21456 Not supported currently
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BOOLEAN(), "To BOOLEAN")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(CHAR(4), "true", true)
+ .testCase(VARCHAR(5), "FalsE", false)
+ .testCase(STRING(), "Apache Flink", null)
+ .testCase(STRING(), "TRUE", true)
+ .testCase(STRING(), "", null)
+ .testCase(BOOLEAN(), true, true)
+ .testCase(BOOLEAN(), false, false)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, true)
+ .testCase(DECIMAL(2, 1), 0.0, false)
+ .testCase(TINYINT(), -125, true)
+ .testCase(TINYINT(), 0, false)
+ .testCase(SMALLINT(), 32767, true)
+ .testCase(SMALLINT(), 0, false)
+ .testCase(INT(), -12345678, true)
+ .testCase(INT(), 0, false)
+ .testCase(BIGINT(), 1234567891234L, true)
+ .testCase(BIGINT(), 0, false)
+ .testCase(FLOAT(), -123.456, true)
+ .testCase(FLOAT(), 0, false)
+ .testCase(DOUBLE(), 12345.67890, true)
+ .testCase(DOUBLE(), 0, false)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
Review comment:
nit: use the `LogicalTypeRoot` for such lists, in this case `INTERVAL_YEAR_MONTH` and `INTERVAL_DAY_TIME` below
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(INTERVAL(YEAR()), 84, "+7-00")
+ .testCase(INTERVAL(MONTH()), 5, "+0-05")
+ .testCase(INTERVAL(MONTH()), 123, "+10-03")
+ .testCase(INTERVAL(MONTH()), 12334, "+1027-10")
+ .testCase(INTERVAL(DAY()), 10, "+0 00:00:00.010")
+ .testCase(INTERVAL(DAY()), 123456789L, "+1 10:17:36.789")
+ .testCase(INTERVAL(DAY()), Duration.ofHours(36), "+1 12:00:00.000")
+ // https://issues.apache.org/jira/browse/FLINK-21456 Not supported currently
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BOOLEAN(), "To BOOLEAN")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(CHAR(4), "true", true)
+ .testCase(VARCHAR(5), "FalsE", false)
+ .testCase(STRING(), "Apache Flink", null)
+ .testCase(STRING(), "TRUE", true)
+ .testCase(STRING(), "", null)
+ .testCase(BOOLEAN(), true, true)
+ .testCase(BOOLEAN(), false, false)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, true)
+ .testCase(DECIMAL(2, 1), 0.0, false)
+ .testCase(TINYINT(), -125, true)
+ .testCase(TINYINT(), 0, false)
+ .testCase(SMALLINT(), 32767, true)
+ .testCase(SMALLINT(), 0, false)
+ .testCase(INT(), -12345678, true)
+ .testCase(INT(), 0, false)
+ .testCase(BIGINT(), 1234567891234L, true)
+ .testCase(BIGINT(), 0, false)
+ .testCase(FLOAT(), -123.456, true)
+ .testCase(FLOAT(), 0, false)
+ .testCase(DOUBLE(), 12345.67890, true)
+ .testCase(DOUBLE(), 0, false)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BINARY(2), "To BINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(VARBINARY(4), "To VARBINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(BYTES(), "To BYTES")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DECIMAL(4, 3), "To DECIMAL")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", new BigDecimal("1.234".toCharArray(), 0, 5))
Review comment:
Why not `new BigDecimal(String)`?
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(INTERVAL(YEAR()), 84, "+7-00")
+ .testCase(INTERVAL(MONTH()), 5, "+0-05")
+ .testCase(INTERVAL(MONTH()), 123, "+10-03")
+ .testCase(INTERVAL(MONTH()), 12334, "+1027-10")
+ .testCase(INTERVAL(DAY()), 10, "+0 00:00:00.010")
+ .testCase(INTERVAL(DAY()), 123456789L, "+1 10:17:36.789")
+ .testCase(INTERVAL(DAY()), Duration.ofHours(36), "+1 12:00:00.000")
+ // https://issues.apache.org/jira/browse/FLINK-21456 Not supported currently
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BOOLEAN(), "To BOOLEAN")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(CHAR(4), "true", true)
+ .testCase(VARCHAR(5), "FalsE", false)
+ .testCase(STRING(), "Apache Flink", null)
+ .testCase(STRING(), "TRUE", true)
+ .testCase(STRING(), "", null)
+ .testCase(BOOLEAN(), true, true)
+ .testCase(BOOLEAN(), false, false)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, true)
+ .testCase(DECIMAL(2, 1), 0.0, false)
+ .testCase(TINYINT(), -125, true)
+ .testCase(TINYINT(), 0, false)
+ .testCase(SMALLINT(), 32767, true)
+ .testCase(SMALLINT(), 0, false)
+ .testCase(INT(), -12345678, true)
+ .testCase(INT(), 0, false)
+ .testCase(BIGINT(), 1234567891234L, true)
+ .testCase(BIGINT(), 0, false)
+ .testCase(FLOAT(), -123.456, true)
+ .testCase(FLOAT(), 0, false)
+ .testCase(DOUBLE(), 12345.67890, true)
+ .testCase(DOUBLE(), 0, false)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BINARY(2), "To BINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(VARBINARY(4), "To VARBINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(BYTES(), "To BYTES")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
Review comment:
let's support casting to `ARRAY<NUMERIC>` or at least `ARRAY<TINYINT>`? otherwise a user cannot use all the built-in array function to access elements or cardinality etc.
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(INTERVAL(YEAR()), 84, "+7-00")
+ .testCase(INTERVAL(MONTH()), 5, "+0-05")
+ .testCase(INTERVAL(MONTH()), 123, "+10-03")
+ .testCase(INTERVAL(MONTH()), 12334, "+1027-10")
+ .testCase(INTERVAL(DAY()), 10, "+0 00:00:00.010")
+ .testCase(INTERVAL(DAY()), 123456789L, "+1 10:17:36.789")
+ .testCase(INTERVAL(DAY()), Duration.ofHours(36), "+1 12:00:00.000")
+ // https://issues.apache.org/jira/browse/FLINK-21456 Not supported currently
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BOOLEAN(), "To BOOLEAN")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(CHAR(4), "true", true)
+ .testCase(VARCHAR(5), "FalsE", false)
+ .testCase(STRING(), "Apache Flink", null)
+ .testCase(STRING(), "TRUE", true)
+ .testCase(STRING(), "", null)
+ .testCase(BOOLEAN(), true, true)
+ .testCase(BOOLEAN(), false, false)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, true)
+ .testCase(DECIMAL(2, 1), 0.0, false)
+ .testCase(TINYINT(), -125, true)
+ .testCase(TINYINT(), 0, false)
+ .testCase(SMALLINT(), 32767, true)
+ .testCase(SMALLINT(), 0, false)
+ .testCase(INT(), -12345678, true)
+ .testCase(INT(), 0, false)
+ .testCase(BIGINT(), 1234567891234L, true)
+ .testCase(BIGINT(), 0, false)
+ .testCase(FLOAT(), -123.456, true)
+ .testCase(FLOAT(), 0, false)
+ .testCase(DOUBLE(), 12345.67890, true)
+ .testCase(DOUBLE(), 0, false)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BINARY(2), "To BINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(VARBINARY(4), "To VARBINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(BYTES(), "To BYTES")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DECIMAL(4, 3), "To DECIMAL")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", new BigDecimal("1.234".toCharArray(), 0, 5))
+ .testCase(BOOLEAN(), true, new BigDecimal("1.000".toCharArray(), 0, 5))
+ .testCase(BOOLEAN(), false, new BigDecimal("0.000".toCharArray(), 0, 5))
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, new BigDecimal("9.870".toCharArray(), 0, 5))
+ .testCase(TINYINT(), -125, null)
+ .testCase(SMALLINT(), 32767, null)
+ .testCase(INT(), -12345678, null)
+ .testCase(BIGINT(), 1234567891234L, null)
+ .testCase(FLOAT(), -123.456, null)
+ .testCase(DOUBLE(), 12345.67890, null)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(TINYINT(), "To TINYINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", (byte) 1)
+ .testCase(STRING(), "123", (byte) 123)
+ .testCase(STRING(), "-130", null)
+ .testCase(BOOLEAN(), true, (byte) 1)
+ .testCase(BOOLEAN(), false, (byte) 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, (byte) 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(10, 3), 9123.87, (byte) -93)
+ .testCase(TINYINT(), -125, (byte) -125)
+ .testCase(SMALLINT(), 32, (byte) 32)
+ .testCase(SMALLINT(), 32767, (byte) -1)
+ .testCase(INT(), -12, (byte) -12)
+ .testCase(INT(), -12345678, (byte) -78)
+ .testCase(BIGINT(), 123, (byte) 123)
+ .testCase(BIGINT(), 1234567891234L, (byte) 34)
+ .testCase(FLOAT(), -123.456, (byte) -123)
+ .testCase(FLOAT(), 128.456, (byte) -128)
+ .testCase(DOUBLE(), 123.4567890, (byte) 123)
+ .testCase(DOUBLE(), 12345.67890, (byte) 57)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(SMALLINT(), "To SMALLINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", (short) 1)
+ .testCase(STRING(), "123", (short) 123)
+ .testCase(STRING(), "-32769", null)
+ .testCase(BOOLEAN(), true, (short) 1)
+ .testCase(BOOLEAN(), false, (short) 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, (short) 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(10, 3), 91235.87, (short) 25699)
+ .testCase(TINYINT(), -125, (short) -125)
+ .testCase(SMALLINT(), 32, (short) 32)
+ .testCase(SMALLINT(), 32780, (short) -32756)
+ .testCase(INT(), -12, (short) -12)
+ .testCase(INT(), -12345678, (short) -24910)
+ .testCase(BIGINT(), 123, (short) 123)
+ .testCase(BIGINT(), 1234567891234L, (short) 2338)
+ .testCase(FLOAT(), -123.456, (short) -123)
+ .testCase(FLOAT(), 123456.78, (short) -7616)
+ .testCase(DOUBLE(), 123.4567890, (short) 123)
+ .testCase(DOUBLE(), 123456.7890, (short) -7616)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(INT(), "To INT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1)
+ .testCase(STRING(), "123", 123)
+ .testCase(STRING(), "-3276913443134", null)
+ .testCase(BOOLEAN(), true, 1)
+ .testCase(BOOLEAN(), false, 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(20, 3), 3276913443134.87, -146603714)
+ .testCase(TINYINT(), -125, -125)
+ .testCase(SMALLINT(), 32, 32)
+ .testCase(INT(), -12345678, -12345678)
+ .testCase(BIGINT(), 123, 123)
+ .testCase(BIGINT(), 1234567891234L, 1912277282)
+ .testCase(FLOAT(), -123.456, -123)
+ .testCase(FLOAT(), 9234567891.12, 644633299)
+ .testCase(DOUBLE(), 123.4567890, 123)
+ .testCase(DOUBLE(), 9234567891.12345, 644633299)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BIGINT(), "To BIGINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1L)
+ .testCase(STRING(), "123", 123L)
+ .testCase(STRING(), "-3276913443134", -3276913443134L)
+ .testCase(BOOLEAN(), true, 1L)
+ .testCase(BOOLEAN(), false, 0L)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9L)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3276913443134L)
+ .testCase(TINYINT(), -125, -125L)
+ .testCase(SMALLINT(), 32, 32L)
+ .testCase(INT(), -12345678, -12345678L)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234L)
+ .testCase(FLOAT(), -123.456, -123L)
+ .testCase(FLOAT(), 9234567891.12, 9234567891L)
+ .testCase(DOUBLE(), 123.4567890, 123L)
+ .testCase(DOUBLE(), 9234567891.12345, 9234567891L)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(FLOAT(), "To FLOAT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1.234f)
+ .testCase(STRING(), "123", 123.0f)
+ .testCase(STRING(), "-3276913443134", -3.27691403E12f)
+ .testCase(BOOLEAN(), true, 1.0f)
+ .testCase(BOOLEAN(), false, 0.0f)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9.87f)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(4, 3), 9.87, 9.87f)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3.27691351E12f)
+ .testCase(TINYINT(), -125, -125f)
+ .testCase(SMALLINT(), 32, 32f)
+ .testCase(INT(), -12345678, -12345678f)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234f)
+ .testCase(FLOAT(), -123.456, -123.456f)
+ .testCase(FLOAT(), 9234567891.12, 9234567891.12f)
+ .testCase(DOUBLE(), 123.4567890, 123.45679f)
+ .testCase(DOUBLE(), 1239234567891.1234567891234, 1.23923451E12f)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DOUBLE(), "To DOUBLE")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1.234d)
+ .testCase(STRING(), "123", 123.0d)
+ .testCase(STRING(), "-3276913443134", -3.276913443134E12)
+ .testCase(BOOLEAN(), true, 1.0d)
+ .testCase(BOOLEAN(), false, 0.0d)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9.87d)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3.27691344313487E12d)
+ .testCase(TINYINT(), -125, -125d)
+ .testCase(SMALLINT(), 32, 32d)
+ .testCase(INT(), -12345678, -12345678d)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234d)
+ .testCase(FLOAT(), -123.456, -123.456d)
+ .testCase(FLOAT(), 9234567891.12, 9234567891.12d)
+ .testCase(DOUBLE(), 123.4567890, 123.456789d)
+ .testCase(DOUBLE(), 1239234567891.1234567891234, 1.2392345678911235E12d)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DATE(), "To DATE")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ // https://issues.apache.org/jira/browse/FLINK-24421 Bug
+ .testCase(STRING(), "123", LocalDate.of(123, 1, 1))
+ .testCase(STRING(), "2021-09-27", LocalDate.of(2021, 9, 27))
+ .testCase(STRING(), "2021/09/27", null)
+ // Not supported - no fix
+ // BOOLEAN
+ // BINARY
+ // VARBINARY
+ // BYTES
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), LocalDate.of(2021, 9, 24))
+ // Not supported - no fix
+ // TIME
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ // Not supported currently
+ // TIMESTAMP_WITH_TIME_ZONE
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24422 - Accept only Instant
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T22:34:56.123456")
Review comment:
how does the instant look like after this? we should design the test in a way that the instant shows `24` but the resulting `LocalDate` is 25 due to the session timezone
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(INTERVAL(YEAR()), 84, "+7-00")
+ .testCase(INTERVAL(MONTH()), 5, "+0-05")
+ .testCase(INTERVAL(MONTH()), 123, "+10-03")
+ .testCase(INTERVAL(MONTH()), 12334, "+1027-10")
+ .testCase(INTERVAL(DAY()), 10, "+0 00:00:00.010")
+ .testCase(INTERVAL(DAY()), 123456789L, "+1 10:17:36.789")
+ .testCase(INTERVAL(DAY()), Duration.ofHours(36), "+1 12:00:00.000")
+ // https://issues.apache.org/jira/browse/FLINK-21456 Not supported currently
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BOOLEAN(), "To BOOLEAN")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(CHAR(4), "true", true)
+ .testCase(VARCHAR(5), "FalsE", false)
+ .testCase(STRING(), "Apache Flink", null)
+ .testCase(STRING(), "TRUE", true)
+ .testCase(STRING(), "", null)
+ .testCase(BOOLEAN(), true, true)
+ .testCase(BOOLEAN(), false, false)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, true)
+ .testCase(DECIMAL(2, 1), 0.0, false)
+ .testCase(TINYINT(), -125, true)
+ .testCase(TINYINT(), 0, false)
+ .testCase(SMALLINT(), 32767, true)
+ .testCase(SMALLINT(), 0, false)
+ .testCase(INT(), -12345678, true)
+ .testCase(INT(), 0, false)
+ .testCase(BIGINT(), 1234567891234L, true)
+ .testCase(BIGINT(), 0, false)
+ .testCase(FLOAT(), -123.456, true)
+ .testCase(FLOAT(), 0, false)
+ .testCase(DOUBLE(), 12345.67890, true)
+ .testCase(DOUBLE(), 0, false)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BINARY(2), "To BINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(VARBINARY(4), "To VARBINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(BYTES(), "To BYTES")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DECIMAL(4, 3), "To DECIMAL")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", new BigDecimal("1.234".toCharArray(), 0, 5))
+ .testCase(BOOLEAN(), true, new BigDecimal("1.000".toCharArray(), 0, 5))
+ .testCase(BOOLEAN(), false, new BigDecimal("0.000".toCharArray(), 0, 5))
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, new BigDecimal("9.870".toCharArray(), 0, 5))
+ .testCase(TINYINT(), -125, null)
+ .testCase(SMALLINT(), 32767, null)
+ .testCase(INT(), -12345678, null)
+ .testCase(BIGINT(), 1234567891234L, null)
+ .testCase(FLOAT(), -123.456, null)
+ .testCase(DOUBLE(), 12345.67890, null)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(TINYINT(), "To TINYINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", (byte) 1)
+ .testCase(STRING(), "123", (byte) 123)
+ .testCase(STRING(), "-130", null)
+ .testCase(BOOLEAN(), true, (byte) 1)
+ .testCase(BOOLEAN(), false, (byte) 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, (byte) 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(10, 3), 9123.87, (byte) -93)
+ .testCase(TINYINT(), -125, (byte) -125)
+ .testCase(SMALLINT(), 32, (byte) 32)
+ .testCase(SMALLINT(), 32767, (byte) -1)
+ .testCase(INT(), -12, (byte) -12)
+ .testCase(INT(), -12345678, (byte) -78)
+ .testCase(BIGINT(), 123, (byte) 123)
+ .testCase(BIGINT(), 1234567891234L, (byte) 34)
+ .testCase(FLOAT(), -123.456, (byte) -123)
+ .testCase(FLOAT(), 128.456, (byte) -128)
+ .testCase(DOUBLE(), 123.4567890, (byte) 123)
+ .testCase(DOUBLE(), 12345.67890, (byte) 57)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(SMALLINT(), "To SMALLINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", (short) 1)
+ .testCase(STRING(), "123", (short) 123)
+ .testCase(STRING(), "-32769", null)
+ .testCase(BOOLEAN(), true, (short) 1)
+ .testCase(BOOLEAN(), false, (short) 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, (short) 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(10, 3), 91235.87, (short) 25699)
+ .testCase(TINYINT(), -125, (short) -125)
+ .testCase(SMALLINT(), 32, (short) 32)
+ .testCase(SMALLINT(), 32780, (short) -32756)
+ .testCase(INT(), -12, (short) -12)
+ .testCase(INT(), -12345678, (short) -24910)
+ .testCase(BIGINT(), 123, (short) 123)
+ .testCase(BIGINT(), 1234567891234L, (short) 2338)
+ .testCase(FLOAT(), -123.456, (short) -123)
+ .testCase(FLOAT(), 123456.78, (short) -7616)
+ .testCase(DOUBLE(), 123.4567890, (short) 123)
+ .testCase(DOUBLE(), 123456.7890, (short) -7616)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(INT(), "To INT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1)
+ .testCase(STRING(), "123", 123)
+ .testCase(STRING(), "-3276913443134", null)
+ .testCase(BOOLEAN(), true, 1)
+ .testCase(BOOLEAN(), false, 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(20, 3), 3276913443134.87, -146603714)
+ .testCase(TINYINT(), -125, -125)
+ .testCase(SMALLINT(), 32, 32)
+ .testCase(INT(), -12345678, -12345678)
+ .testCase(BIGINT(), 123, 123)
+ .testCase(BIGINT(), 1234567891234L, 1912277282)
+ .testCase(FLOAT(), -123.456, -123)
+ .testCase(FLOAT(), 9234567891.12, 644633299)
+ .testCase(DOUBLE(), 123.4567890, 123)
+ .testCase(DOUBLE(), 9234567891.12345, 644633299)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BIGINT(), "To BIGINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1L)
+ .testCase(STRING(), "123", 123L)
+ .testCase(STRING(), "-3276913443134", -3276913443134L)
+ .testCase(BOOLEAN(), true, 1L)
+ .testCase(BOOLEAN(), false, 0L)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9L)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3276913443134L)
+ .testCase(TINYINT(), -125, -125L)
+ .testCase(SMALLINT(), 32, 32L)
+ .testCase(INT(), -12345678, -12345678L)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234L)
+ .testCase(FLOAT(), -123.456, -123L)
+ .testCase(FLOAT(), 9234567891.12, 9234567891L)
+ .testCase(DOUBLE(), 123.4567890, 123L)
+ .testCase(DOUBLE(), 9234567891.12345, 9234567891L)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(FLOAT(), "To FLOAT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1.234f)
+ .testCase(STRING(), "123", 123.0f)
+ .testCase(STRING(), "-3276913443134", -3.27691403E12f)
+ .testCase(BOOLEAN(), true, 1.0f)
+ .testCase(BOOLEAN(), false, 0.0f)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9.87f)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(4, 3), 9.87, 9.87f)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3.27691351E12f)
+ .testCase(TINYINT(), -125, -125f)
+ .testCase(SMALLINT(), 32, 32f)
+ .testCase(INT(), -12345678, -12345678f)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234f)
+ .testCase(FLOAT(), -123.456, -123.456f)
+ .testCase(FLOAT(), 9234567891.12, 9234567891.12f)
+ .testCase(DOUBLE(), 123.4567890, 123.45679f)
+ .testCase(DOUBLE(), 1239234567891.1234567891234, 1.23923451E12f)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DOUBLE(), "To DOUBLE")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1.234d)
+ .testCase(STRING(), "123", 123.0d)
+ .testCase(STRING(), "-3276913443134", -3.276913443134E12)
+ .testCase(BOOLEAN(), true, 1.0d)
+ .testCase(BOOLEAN(), false, 0.0d)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9.87d)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3.27691344313487E12d)
+ .testCase(TINYINT(), -125, -125d)
+ .testCase(SMALLINT(), 32, 32d)
+ .testCase(INT(), -12345678, -12345678d)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234d)
+ .testCase(FLOAT(), -123.456, -123.456d)
+ .testCase(FLOAT(), 9234567891.12, 9234567891.12d)
+ .testCase(DOUBLE(), 123.4567890, 123.456789d)
+ .testCase(DOUBLE(), 1239234567891.1234567891234, 1.2392345678911235E12d)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DATE(), "To DATE")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ // https://issues.apache.org/jira/browse/FLINK-24421 Bug
+ .testCase(STRING(), "123", LocalDate.of(123, 1, 1))
+ .testCase(STRING(), "2021-09-27", LocalDate.of(2021, 9, 27))
+ .testCase(STRING(), "2021/09/27", null)
+ // Not supported - no fix
+ // BOOLEAN
+ // BINARY
+ // VARBINARY
+ // BYTES
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), LocalDate.of(2021, 9, 24))
+ // Not supported - no fix
+ // TIME
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ // Not supported currently
+ // TIMESTAMP_WITH_TIME_ZONE
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24422 - Accept only Instant
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T22:34:56.123456")
+ .toInstant(ZoneOffset.ofHours(5)),
+ LocalDate.of(2021, 9, 25))
+ // Not supported - no fix
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(TIME(), "To TIME")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ // https://issues.apache.org/jira/browse/FLINK-24421 Bug
+ .testCase(STRING(), "123", LocalTime.of(23, 1, 1))
+ .testCase(STRING(), "2021-09-27", null)
+ // https://issues.apache.org/jira/browse/FLINK-24415 Fractional seconds are
+ // lost
+ .testCase(STRING(), "12:34:56.123456789", LocalTime.of(12, 34, 56, 0))
+ .testCase(STRING(), "2021-09-27 12:34:56.123456789", null)
+ // Not supported - no fix
+ // BOOLEAN
+ // BINARY
+ // VARBINARY
+ // BYTES
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ .testCase(
+ TIME(5),
+ LocalTime.parse("12:34:56.1234567"),
+ LocalTime.of(12, 34, 56, 0))
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalTime.of(12, 34, 56, 0))
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalTime.of(12, 34, 56, 0))
+ // Not supported currently
+ // TIMESTAMP_WITH_TIME_ZONE
Review comment:
link to https://issues.apache.org/jira/browse/FLINK-20869
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(INTERVAL(YEAR()), 84, "+7-00")
+ .testCase(INTERVAL(MONTH()), 5, "+0-05")
+ .testCase(INTERVAL(MONTH()), 123, "+10-03")
+ .testCase(INTERVAL(MONTH()), 12334, "+1027-10")
+ .testCase(INTERVAL(DAY()), 10, "+0 00:00:00.010")
+ .testCase(INTERVAL(DAY()), 123456789L, "+1 10:17:36.789")
+ .testCase(INTERVAL(DAY()), Duration.ofHours(36), "+1 12:00:00.000")
+ // https://issues.apache.org/jira/browse/FLINK-21456 Not supported currently
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BOOLEAN(), "To BOOLEAN")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(CHAR(4), "true", true)
+ .testCase(VARCHAR(5), "FalsE", false)
+ .testCase(STRING(), "Apache Flink", null)
+ .testCase(STRING(), "TRUE", true)
+ .testCase(STRING(), "", null)
+ .testCase(BOOLEAN(), true, true)
+ .testCase(BOOLEAN(), false, false)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, true)
+ .testCase(DECIMAL(2, 1), 0.0, false)
+ .testCase(TINYINT(), -125, true)
+ .testCase(TINYINT(), 0, false)
+ .testCase(SMALLINT(), 32767, true)
+ .testCase(SMALLINT(), 0, false)
+ .testCase(INT(), -12345678, true)
+ .testCase(INT(), 0, false)
+ .testCase(BIGINT(), 1234567891234L, true)
+ .testCase(BIGINT(), 0, false)
+ .testCase(FLOAT(), -123.456, true)
+ .testCase(FLOAT(), 0, false)
+ .testCase(DOUBLE(), 12345.67890, true)
+ .testCase(DOUBLE(), 0, false)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BINARY(2), "To BINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
Review comment:
for the unsupported cases, did you check that the exception came from the validation layer instead of the code generator?
##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -18,176 +18,905 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
import org.junit.runners.Parameterized;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.api.DataTypes.YEAR;
import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-import static org.apache.flink.table.api.Expressions.row;
/** Tests for {@link BuiltInFunctionDefinitions#CAST}. */
public class CastFunctionITCase extends BuiltInFunctionTestBase {
+ private static final ZoneId TEST_TZ = ZoneId.of("Asia/Shanghai");
+
+ @Override
+ protected TableEnvironment env() {
+ TableEnvironment env = super.env();
+ env.getConfig().getConfiguration().set(TableConfigOptions.LOCAL_TIME_ZONE, TEST_TZ.getId());
+ return env;
+ }
+
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with different field names")
- .onFieldsWithData(Row.of(12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<otherNameInt INT, otherNameString STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CAST, "implicit with type widening")
- .onFieldsWithData(Row.of((byte) 12, "Hello"))
- .andDataTypes(DataTypes.of("ROW<i TINYINT, s STRING>"))
- .withFunction(RowToFirstField.class)
- .testResult(
- call("RowToFirstField", $("f0")),
- "RowToFirstField(f0)",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit with nested type widening")
- .onFieldsWithData(Row.of(Row.of(12, 42), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT>, s STRING>"))
- .withFunction(NestedRowToFirstField.class)
- .testResult(
- call("NestedRowToFirstField", $("f0")),
- "NestedRowToFirstField(f0)",
- Row.of(12, 42.0),
- DataTypes.of("ROW<i INT, d DOUBLE>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and implicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)",
- Row.of(Row.of("12", true, null), "Hello"),
- // the inner NOT NULL is ignored in SQL because the outer ROW is
- // nullable and the cast does not allow setting the outer
- // nullability but derives it from the source operand
- DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit with nested rows and explicit nullability change")
- .onFieldsWithData(Row.of(Row.of(12, 42, null), "Hello"))
- .andDataTypes(DataTypes.of("ROW<r ROW<i1 INT, i2 INT, i3 INT>, s STRING>"))
- .testTableApiResult(
- $("f0").cast(
- DataTypes.ROW(
- DataTypes.FIELD(
- "r",
- DataTypes.ROW(
- DataTypes.FIELD(
- "s",
- DataTypes.STRING()
- .notNull()),
- DataTypes.FIELD(
- "b",
- DataTypes
- .BOOLEAN()),
- DataTypes.FIELD(
- "i",
- DataTypes.INT()))),
- DataTypes.FIELD("s", DataTypes.STRING()))),
- Row.of(Row.of("12", true, null), "Hello"),
- DataTypes.of(
- "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "implicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .withFunction(RowToFirstField.class)
- .testResult(
- call(
- "RowToFirstField",
- call("StructuredTypeConstructor", row($("f0"), $("f1")))),
- "RowToFirstField(StructuredTypeConstructor((f0, f1)))",
- 12,
- DataTypes.INT()),
- TestSpec.forFunction(
- BuiltInFunctionDefinitions.CAST,
- "explicit between structured type and row")
- .onFieldsWithData(12, "Ingo")
- .withFunction(StructuredTypeConstructor.class)
- .testTableApiResult(
- call("StructuredTypeConstructor", row($("f0"), $("f1")))
- .cast(
- DataTypes.ROW(
- DataTypes.BIGINT(), DataTypes.STRING())),
- Row.of(12L, "Ingo"),
- DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING())));
+ CastTestSpecBuilder.test(CHAR(3), "To CHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef") // "abc"
+ .testCase(DATE(), LocalDate.parse("2021-30-09"), "2021-30-09") // "202"
+ .build(),
+ CastTestSpecBuilder.test(VARCHAR(3), "To VARCHAR(3)")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(4), "foo", "foo ")
+ .testCase(CHAR(4), "foo ", "foo ")
+ .testCase(VARCHAR(3), "foo", "foo")
+ .testCase(VARCHAR(5), "foo", "foo")
+ .testCase(VARCHAR(5), "foo ", "foo ")
+ // https://issues.apache.org/jira/browse/FLINK-24413 - Trim to precision
+ // in this case down to 3 chars
+ .testCase(STRING(), "abcdef", "abcdef")
+ .build(),
+ CastTestSpecBuilder.test(STRING(), "To STRING")
+ .testCase(CHAR(3), "foo", "foo")
+ .testCase(CHAR(5), "foo", "foo ")
+ .testCase(VARCHAR(5), "Flink", "Flink")
+ .testCase(VARCHAR(10), "Flink", "Flink")
+ .testCase(STRING(), "Apache Flink", "Apache Flink")
+ .testCase(STRING(), null, null)
+ .testCase(BOOLEAN(), true, "true")
+ .testCase(BINARY(2), new byte[] {0, 1}, "\u0000\u0001")
+ .testCase(BINARY(3), new byte[] {0, 1}, "\u0000\u0001\u0000")
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(VARBINARY(5), new byte[] {0, 1, 2}, "\u0000\u0001\u0002")
+ .testCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+ "\u0000\u0001\u0002\u0003\u0004")
+ .testCase(DECIMAL(4, 3), 9.87, "9.870")
+ // https://issues.apache.org/jira/browse/FLINK-24403 - Left zero padding
+ // currently not working
+ // .testCase(DECIMAL(5, 3), 09.87, "09.870")
+ .testCase(TINYINT(), -125, "-125")
+ .testCase(SMALLINT(), 32767, "32767")
+ .testCase(INT(), -12345678, "-12345678")
+ .testCase(BIGINT(), 1234567891234L, "1234567891234")
+ .testCase(FLOAT(), -123.456, "-123.456")
+ .testCase(DOUBLE(), 12345.67890, "12345.6789")
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), "2021-09-24")
+ // https://issues.apache.org/jira/browse/FLINK-24415 Currently, fractional
+ // seconds are lost
+ .testCase(TIME(5), LocalTime.parse("12:34:56.1234567"), "12:34:56")
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.1234")
+ // Not supported
+ // .testCase(TIMESTAMP_WITH_TIME_ZONE(),
+ // LocalDateTime.parse("2021-09-24T12:34:56.123456"), "2021-09-24
+ // 12:34:56.123456+0200")
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24416 No timezone id/offset
+ // info
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ "2021-09-24 12:34:56.123456")
+ .testCase(INTERVAL(YEAR()), 84, "+7-00")
+ .testCase(INTERVAL(MONTH()), 5, "+0-05")
+ .testCase(INTERVAL(MONTH()), 123, "+10-03")
+ .testCase(INTERVAL(MONTH()), 12334, "+1027-10")
+ .testCase(INTERVAL(DAY()), 10, "+0 00:00:00.010")
+ .testCase(INTERVAL(DAY()), 123456789L, "+1 10:17:36.789")
+ .testCase(INTERVAL(DAY()), Duration.ofHours(36), "+1 12:00:00.000")
+ // https://issues.apache.org/jira/browse/FLINK-21456 Not supported currently
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BOOLEAN(), "To BOOLEAN")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(CHAR(4), "true", true)
+ .testCase(VARCHAR(5), "FalsE", false)
+ .testCase(STRING(), "Apache Flink", null)
+ .testCase(STRING(), "TRUE", true)
+ .testCase(STRING(), "", null)
+ .testCase(BOOLEAN(), true, true)
+ .testCase(BOOLEAN(), false, false)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, true)
+ .testCase(DECIMAL(2, 1), 0.0, false)
+ .testCase(TINYINT(), -125, true)
+ .testCase(TINYINT(), 0, false)
+ .testCase(SMALLINT(), 32767, true)
+ .testCase(SMALLINT(), 0, false)
+ .testCase(INT(), -12345678, true)
+ .testCase(INT(), 0, false)
+ .testCase(BIGINT(), 1234567891234L, true)
+ .testCase(BIGINT(), 0, false)
+ .testCase(FLOAT(), -123.456, true)
+ .testCase(FLOAT(), 0, false)
+ .testCase(DOUBLE(), 12345.67890, true)
+ .testCase(DOUBLE(), 0, false)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BINARY(2), "To BINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(VARBINARY(4), "To VARBINARY")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // .testCase(RAW(byte[].class), new byte[] {1}, new byte[] {1})
+ .build(),
+ CastTestSpecBuilder.test(BYTES(), "To BYTES")
+ .testCase(CHAR(3), "foo", new byte[] {102, 111, 111})
+ .testCase(VARCHAR(5), "Flink", new byte[] {70, 108, 105, 110, 107})
+ .testCase(STRING(), "Apache", new byte[] {65, 112, 97, 99, 104, 101})
+ // Not supported - no fix
+ // BOOLEAN
+ .testCase(BINARY(2), new byte[] {0, 1}, new byte[] {0, 1})
+ .testCase(VARBINARY(3), new byte[] {0, 1, 2}, new byte[] {0, 1, 2})
+ .testCase(BYTES(), new byte[] {0, 1, 2, 3, 4}, new byte[] {0, 1, 2, 3, 4})
+ // Not supported - no fix
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24418
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DECIMAL(4, 3), "To DECIMAL")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", new BigDecimal("1.234".toCharArray(), 0, 5))
+ .testCase(BOOLEAN(), true, new BigDecimal("1.000".toCharArray(), 0, 5))
+ .testCase(BOOLEAN(), false, new BigDecimal("0.000".toCharArray(), 0, 5))
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, new BigDecimal("9.870".toCharArray(), 0, 5))
+ .testCase(TINYINT(), -125, null)
+ .testCase(SMALLINT(), 32767, null)
+ .testCase(INT(), -12345678, null)
+ .testCase(BIGINT(), 1234567891234L, null)
+ .testCase(FLOAT(), -123.456, null)
+ .testCase(DOUBLE(), 12345.67890, null)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(TINYINT(), "To TINYINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", (byte) 1)
+ .testCase(STRING(), "123", (byte) 123)
+ .testCase(STRING(), "-130", null)
+ .testCase(BOOLEAN(), true, (byte) 1)
+ .testCase(BOOLEAN(), false, (byte) 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, (byte) 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(10, 3), 9123.87, (byte) -93)
+ .testCase(TINYINT(), -125, (byte) -125)
+ .testCase(SMALLINT(), 32, (byte) 32)
+ .testCase(SMALLINT(), 32767, (byte) -1)
+ .testCase(INT(), -12, (byte) -12)
+ .testCase(INT(), -12345678, (byte) -78)
+ .testCase(BIGINT(), 123, (byte) 123)
+ .testCase(BIGINT(), 1234567891234L, (byte) 34)
+ .testCase(FLOAT(), -123.456, (byte) -123)
+ .testCase(FLOAT(), 128.456, (byte) -128)
+ .testCase(DOUBLE(), 123.4567890, (byte) 123)
+ .testCase(DOUBLE(), 12345.67890, (byte) 57)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(SMALLINT(), "To SMALLINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", (short) 1)
+ .testCase(STRING(), "123", (short) 123)
+ .testCase(STRING(), "-32769", null)
+ .testCase(BOOLEAN(), true, (short) 1)
+ .testCase(BOOLEAN(), false, (short) 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, (short) 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(10, 3), 91235.87, (short) 25699)
+ .testCase(TINYINT(), -125, (short) -125)
+ .testCase(SMALLINT(), 32, (short) 32)
+ .testCase(SMALLINT(), 32780, (short) -32756)
+ .testCase(INT(), -12, (short) -12)
+ .testCase(INT(), -12345678, (short) -24910)
+ .testCase(BIGINT(), 123, (short) 123)
+ .testCase(BIGINT(), 1234567891234L, (short) 2338)
+ .testCase(FLOAT(), -123.456, (short) -123)
+ .testCase(FLOAT(), 123456.78, (short) -7616)
+ .testCase(DOUBLE(), 123.4567890, (short) 123)
+ .testCase(DOUBLE(), 123456.7890, (short) -7616)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(INT(), "To INT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1)
+ .testCase(STRING(), "123", 123)
+ .testCase(STRING(), "-3276913443134", null)
+ .testCase(BOOLEAN(), true, 1)
+ .testCase(BOOLEAN(), false, 0)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(20, 3), 3276913443134.87, -146603714)
+ .testCase(TINYINT(), -125, -125)
+ .testCase(SMALLINT(), 32, 32)
+ .testCase(INT(), -12345678, -12345678)
+ .testCase(BIGINT(), 123, 123)
+ .testCase(BIGINT(), 1234567891234L, 1912277282)
+ .testCase(FLOAT(), -123.456, -123)
+ .testCase(FLOAT(), 9234567891.12, 644633299)
+ .testCase(DOUBLE(), 123.4567890, 123)
+ .testCase(DOUBLE(), 9234567891.12345, 644633299)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(BIGINT(), "To BIGINT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1L)
+ .testCase(STRING(), "123", 123L)
+ .testCase(STRING(), "-3276913443134", -3276913443134L)
+ .testCase(BOOLEAN(), true, 1L)
+ .testCase(BOOLEAN(), false, 0L)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9L)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3276913443134L)
+ .testCase(TINYINT(), -125, -125L)
+ .testCase(SMALLINT(), 32, 32L)
+ .testCase(INT(), -12345678, -12345678L)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234L)
+ .testCase(FLOAT(), -123.456, -123L)
+ .testCase(FLOAT(), 9234567891.12, 9234567891L)
+ .testCase(DOUBLE(), 123.4567890, 123L)
+ .testCase(DOUBLE(), 9234567891.12345, 9234567891L)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(FLOAT(), "To FLOAT")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1.234f)
+ .testCase(STRING(), "123", 123.0f)
+ .testCase(STRING(), "-3276913443134", -3.27691403E12f)
+ .testCase(BOOLEAN(), true, 1.0f)
+ .testCase(BOOLEAN(), false, 0.0f)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9.87f)
+ // https://issues.apache.org/jira/browse/FLINK-24420 - Check out of range
+ // instead of overflow
+ .testCase(DECIMAL(4, 3), 9.87, 9.87f)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3.27691351E12f)
+ .testCase(TINYINT(), -125, -125f)
+ .testCase(SMALLINT(), 32, 32f)
+ .testCase(INT(), -12345678, -12345678f)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234f)
+ .testCase(FLOAT(), -123.456, -123.456f)
+ .testCase(FLOAT(), 9234567891.12, 9234567891.12f)
+ .testCase(DOUBLE(), 123.4567890, 123.45679f)
+ .testCase(DOUBLE(), 1239234567891.1234567891234, 1.23923451E12f)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DOUBLE(), "To DOUBLE")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ .testCase(STRING(), "Apache", null)
+ .testCase(STRING(), "1.234", 1.234d)
+ .testCase(STRING(), "123", 123.0d)
+ .testCase(STRING(), "-3276913443134", -3.276913443134E12)
+ .testCase(BOOLEAN(), true, 1.0d)
+ .testCase(BOOLEAN(), false, 0.0d)
+ // Not supported - no fix
+ // BINARY
+ // VARBINARY
+ // BYTES
+ .testCase(DECIMAL(4, 3), 9.87, 9.87d)
+ .testCase(DECIMAL(20, 3), 3276913443134.87, 3.27691344313487E12d)
+ .testCase(TINYINT(), -125, -125d)
+ .testCase(SMALLINT(), 32, 32d)
+ .testCase(INT(), -12345678, -12345678d)
+ .testCase(BIGINT(), 1234567891234L, 1234567891234d)
+ .testCase(FLOAT(), -123.456, -123.456d)
+ .testCase(FLOAT(), 9234567891.12, 9234567891.12d)
+ .testCase(DOUBLE(), 123.4567890, 123.456789d)
+ .testCase(DOUBLE(), 1239234567891.1234567891234, 1.2392345678911235E12d)
+ // Not supported - no fix
+ // DATE
+ // TIME
+ // TIMESTAMP
+ // TIMESTAMP_WITH_TIME_ZONE
+ // TIMESTAMP_LTZ
+ // INTERVAL(YEAR())
+ // INTERVAL(DAY())
+ // ARRAY
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
+ CastTestSpecBuilder.test(DATE(), "To DATE")
+ .testCase(CHAR(3), "foo", null)
+ .testCase(VARCHAR(5), "Flink", null)
+ // https://issues.apache.org/jira/browse/FLINK-24421 Bug
+ .testCase(STRING(), "123", LocalDate.of(123, 1, 1))
+ .testCase(STRING(), "2021-09-27", LocalDate.of(2021, 9, 27))
+ .testCase(STRING(), "2021/09/27", null)
+ // Not supported - no fix
+ // BOOLEAN
+ // BINARY
+ // VARBINARY
+ // BYTES
+ // DECIMAL
+ // TINYINT
+ // SMALLINT
+ // INT
+ // BIGINT
+ // FLOAT
+ // DOUBLE
+ .testCase(DATE(), LocalDate.parse("2021-09-24"), LocalDate.of(2021, 9, 24))
+ // Not supported - no fix
+ // TIME
+ .testCase(
+ TIMESTAMP(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ .testCase(
+ TIMESTAMP(4),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ // Not supported currently
+ // TIMESTAMP_WITH_TIME_ZONE
+ //
+ // https://issues.apache.org/jira/browse/FLINK-24422 - Accept only Instant
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T12:34:56.123456"),
+ LocalDate.of(2021, 9, 24))
+ .testCase(
+ TIMESTAMP_LTZ(),
+ LocalDateTime.parse("2021-09-24T22:34:56.123456")
Review comment:
or reference `TEST_TZ` instead of `5`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org