You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/22 07:05:53 UTC
[flink] 02/03: [FLINK-24781][table-planner] Add string parsing methods to BinaryStringDataUtil and add from string cast rules
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0426d8c7af0191f75e6aaa4696b3358de059dc67
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 15 13:40:34 2021 +0100
[FLINK-24781][table-planner] Add string parsing methods to BinaryStringDataUtil and add from string cast rules
Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
.../apache/flink/table/utils/DateTimeUtils.java | 5 +-
.../AbstractExpressionCodeGeneratorCastRule.java | 9 +-
.../functions/casting/CastRuleProvider.java | 8 +
.../CodeGeneratedExpressionCastExecutor.java | 1 -
.../functions/casting/StringToBinaryCastRule.java | 50 +++++
.../functions/casting/StringToBooleanCastRule.java | 55 ++++++
.../functions/casting/StringToDateCastRule.java | 55 ++++++
.../functions/casting/StringToDecimalCastRule.java | 63 ++++++
.../casting/StringToNumericPrimitiveCastRule.java | 78 ++++++++
.../functions/casting/StringToTimeCastRule.java | 58 ++++++
.../casting/StringToTimestampCastRule.java | 63 ++++++
.../planner/codegen/calls/BuiltInMethods.scala | 68 +++++--
.../planner/codegen/calls/ScalarOperatorGens.scala | 105 ----------
.../planner/functions/casting/CastRulesTest.java | 211 ++++++++++++++++++++-
.../table/data/binary/BinaryStringDataUtil.java | 169 ++++++++++-------
.../flink/table/data/BinaryStringDataTest.java | 41 ++--
16 files changed, 826 insertions(+), 213 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index fdd715b..06fc883 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -317,6 +317,7 @@ public class DateTimeUtils {
// --------------------------------------------------------------------------------------------
// String --> Timestamp conversion
// --------------------------------------------------------------------------------------------
+
public static TimestampData toTimestampData(String dateStr) {
int length = dateStr.length();
String format;
@@ -411,7 +412,7 @@ public class DateTimeUtils {
* @param format date time string format
* @param tz the time zone
*/
- public static Long toTimestamp(String dateStr, String format, TimeZone tz) {
+ private static Long toTimestamp(String dateStr, String format, TimeZone tz) {
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
formatter.setTimeZone(tz);
try {
@@ -1717,7 +1718,7 @@ public class DateTimeUtils {
return timeStringToUnixDate(v, 0);
}
- public static Integer timeStringToUnixDate(String v, int start) {
+ private static Integer timeStringToUnixDate(String v, int start) {
final int colon1 = v.indexOf(':', start);
// timezone hh:mm:ss[.ssssss][[+|-]hh:mm:ss]
// refer https://www.w3.org/TR/NOTE-datetime
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
index aa0a50b..6700a7e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java
@@ -73,7 +73,14 @@ abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT>
box(
generateExpression(
createCodeGeneratorCastRuleContext(context),
- unbox(inputArgumentName, inputLogicalType),
+ unbox(
+ // We need the casting because the rules uses the
+ // concrete classes (e.g. StringData and
+ // BinaryStringData)
+ cast(
+ boxedTypeTermForType(inputLogicalType),
+ inputArgumentName),
+ inputLogicalType),
inputLogicalType,
targetLogicalType),
targetLogicalType));
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index b098191..cb5dfa2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -58,6 +58,14 @@ public class CastRuleProvider {
.addRule(MapAndMultisetToStringCastRule.INSTANCE)
.addRule(RowToStringCastRule.INSTANCE)
.addRule(RawToStringCastRule.INSTANCE)
+ // From string rules
+ .addRule(StringToBooleanCastRule.INSTANCE)
+ .addRule(StringToDecimalCastRule.INSTANCE)
+ .addRule(StringToNumericPrimitiveCastRule.INSTANCE)
+ .addRule(StringToDateCastRule.INSTANCE)
+ .addRule(StringToTimeCastRule.INSTANCE)
+ .addRule(StringToTimestampCastRule.INSTANCE)
+ .addRule(StringToBinaryCastRule.INSTANCE)
// Collection rules
.addRule(ArrayToArrayCastRule.INSTANCE)
// Special rules
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
index f39089a..7c361ac 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratedExpressionCastExecutor.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.functions.casting;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.utils.CastExecutor;
-import org.apache.flink.util.FlinkRuntimeException;
import org.codehaus.janino.ExpressionEvaluator;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
new file mode 100644
index 0000000..2a70de4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBinaryCastRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#BINARY_STRING} cast rule.
+ */
+class StringToBinaryCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, byte[]> {
+
+ static final StringToBinaryCastRule INSTANCE = new StringToBinaryCastRule();
+
+ private StringToBinaryCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeFamily.CHARACTER_STRING)
+ .target(LogicalTypeFamily.BINARY_STRING)
+ .build());
+ }
+
+ @Override
+ public String generateExpression(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ return methodCall(inputTerm, "toBytes");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java
new file mode 100644
index 0000000..b8730eb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToBooleanCastRule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_BOOLEAN;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#BOOLEAN} cast rule. */
+class StringToBooleanCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Boolean> {
+
+ static final StringToBooleanCastRule INSTANCE = new StringToBooleanCastRule();
+
+ private StringToBooleanCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeFamily.CHARACTER_STRING)
+ .target(LogicalTypeRoot.BOOLEAN)
+ .build());
+ }
+
+ @Override
+ public String generateExpression(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ return staticCall(STRING_DATA_TO_BOOLEAN(), inputTerm);
+ }
+
+ @Override
+ public boolean canFail() {
+ return true;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java
new file mode 100644
index 0000000..2a31f22
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDateCastRule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DATE;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#DATE} cast rule. */
+class StringToDateCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Integer> {
+
+ static final StringToDateCastRule INSTANCE = new StringToDateCastRule();
+
+ private StringToDateCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeFamily.CHARACTER_STRING)
+ .target(LogicalTypeRoot.DATE)
+ .build());
+ }
+
+ @Override
+ public String generateExpression(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ return staticCall(STRING_DATA_TO_DATE(), inputTerm);
+ }
+
+ @Override
+ public boolean canFail() {
+ return true;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java
new file mode 100644
index 0000000..77108d8
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToDecimalCastRule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DECIMAL;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#DECIMAL} cast rule. */
+class StringToDecimalCastRule
+ extends AbstractExpressionCodeGeneratorCastRule<StringData, DecimalData> {
+
+ static final StringToDecimalCastRule INSTANCE = new StringToDecimalCastRule();
+
+ private StringToDecimalCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeFamily.CHARACTER_STRING)
+ .target(LogicalTypeRoot.DECIMAL)
+ .build());
+ }
+
+ @Override
+ public String generateExpression(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final DecimalType targetDecimalType = (DecimalType) targetLogicalType;
+ return staticCall(
+ STRING_DATA_TO_DECIMAL(),
+ inputTerm,
+ targetDecimalType.getPrecision(),
+ targetDecimalType.getScale());
+ }
+
+ @Override
+ public boolean canFail() {
+ return true;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java
new file mode 100644
index 0000000..cec9ca8
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToNumericPrimitiveCastRule.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_BYTE;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_DOUBLE;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_FLOAT;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_INT;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_LONG;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_SHORT;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#INTEGER_NUMERIC} and
+ * {@link LogicalTypeFamily#APPROXIMATE_NUMERIC} cast rule.
+ */
+class StringToNumericPrimitiveCastRule
+ extends AbstractExpressionCodeGeneratorCastRule<StringData, Number> {
+
+ static final StringToNumericPrimitiveCastRule INSTANCE = new StringToNumericPrimitiveCastRule();
+
+ private StringToNumericPrimitiveCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeFamily.CHARACTER_STRING)
+ .target(LogicalTypeFamily.INTEGER_NUMERIC)
+ .target(LogicalTypeFamily.APPROXIMATE_NUMERIC)
+ .build());
+ }
+
+ @Override
+ public String generateExpression(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ switch (targetLogicalType.getTypeRoot()) {
+ case TINYINT:
+ return staticCall(STRING_DATA_TO_BYTE(), inputTerm);
+ case SMALLINT:
+ return staticCall(STRING_DATA_TO_SHORT(), inputTerm);
+ case INTEGER:
+ return staticCall(STRING_DATA_TO_INT(), inputTerm);
+ case BIGINT:
+ return staticCall(STRING_DATA_TO_LONG(), inputTerm);
+ case FLOAT:
+ return staticCall(STRING_DATA_TO_FLOAT(), inputTerm);
+ case DOUBLE:
+ return staticCall(STRING_DATA_TO_DOUBLE(), inputTerm);
+ }
+ throw new IllegalArgumentException("This is a bug. Please file an issue.");
+ }
+
+ @Override
+ public boolean canFail() {
+ return true;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java
new file mode 100644
index 0000000..4c14b78
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimeCastRule.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIME;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} cast
+ * rule.
+ */
+class StringToTimeCastRule extends AbstractExpressionCodeGeneratorCastRule<StringData, Integer> {
+
+ static final StringToTimeCastRule INSTANCE = new StringToTimeCastRule();
+
+ private StringToTimeCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeFamily.CHARACTER_STRING)
+ .target(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
+ .build());
+ }
+
+ @Override
+ public String generateExpression(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ return staticCall(STRING_DATA_TO_TIME(), inputTerm);
+ }
+
+ @Override
+ public boolean canFail() {
+ return true;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java
new file mode 100644
index 0000000..96e40b3
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/StringToTimestampCastRule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIMESTAMP;
+import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.STRING_DATA_TO_TIMESTAMP_WITH_ZONE;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/** {@link LogicalTypeFamily#CHARACTER_STRING} to {@link LogicalTypeFamily#TIMESTAMP} cast rule. */
+class StringToTimestampCastRule
+ extends AbstractExpressionCodeGeneratorCastRule<StringData, TimestampData> {
+
+ static final StringToTimestampCastRule INSTANCE = new StringToTimestampCastRule();
+
+ private StringToTimestampCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeFamily.CHARACTER_STRING)
+ .target(LogicalTypeFamily.TIMESTAMP)
+ .build());
+ }
+
+ @Override
+ public String generateExpression(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ if (targetLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
+ return staticCall(STRING_DATA_TO_TIMESTAMP(), inputTerm);
+ }
+
+ return staticCall(
+ STRING_DATA_TO_TIMESTAMP_WITH_ZONE(), inputTerm, context.getSessionTimeZoneTerm());
+ }
+
+ @Override
+ public boolean canFail() {
+ return true;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 15132b0..308826d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -22,14 +22,14 @@ import org.apache.flink.table.data.{DecimalData, DecimalDataUtils, TimestampData
import org.apache.flink.table.runtime.functions._
import org.apache.flink.table.utils.DateTimeUtils
import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
-
import org.apache.calcite.linq4j.tree.Types
import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions}
import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, SqlJsonValueEmptyOrErrorBehavior}
-import org.apache.flink.table.data.binary.BinaryStringData
+import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUtil}
import java.lang.reflect.Method
import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => JShort}
+import java.time.ZoneId
import java.util.TimeZone
object BuiltInMethods {
@@ -402,16 +402,6 @@ object BuiltInMethods {
"toTimestampData",
classOf[String], classOf[String])
- val STRING_TO_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
- classOf[DateTimeUtils],
- "toTimestamp",
- classOf[String], classOf[TimeZone])
-
- val STRING_TO_TIMESTAMP_WITH_FORMAT_TIME_ZONE = Types.lookupMethod(
- classOf[DateTimeUtils],
- "toTimestamp",
- classOf[String], classOf[String], classOf[TimeZone])
-
val TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_DATE = Types.lookupMethod(
classOf[DateTimeUtils],
"timestampWithLocalZoneToDate",
@@ -547,6 +537,60 @@ object BuiltInMethods {
val BINARY_STRING_DATA_FROM_STRING = Types.lookupMethod(classOf[BinaryStringData], "fromString",
classOf[String])
+ val STRING_DATA_TO_BOOLEAN = Types.lookupMethod(
+ classOf[BinaryStringDataUtil],
+ "toBoolean",
+ classOf[BinaryStringData])
+
+ val STRING_DATA_TO_DECIMAL = Types.lookupMethod(
+ classOf[BinaryStringDataUtil],
+ "toDecimal",
+ classOf[BinaryStringData],
+ classOf[Int],
+ classOf[Int])
+
+ val STRING_DATA_TO_LONG = Types.lookupMethod(
+ classOf[BinaryStringDataUtil],
+ "toLong",
+ classOf[BinaryStringData])
+
+ val STRING_DATA_TO_INT = Types.lookupMethod(
+ classOf[BinaryStringDataUtil],
+ "toInt",
+ classOf[BinaryStringData])
+
+ val STRING_DATA_TO_SHORT = Types.lookupMethod(
+ classOf[BinaryStringDataUtil],
+ "toShort",
+ classOf[BinaryStringData])
+
+ val STRING_DATA_TO_BYTE = Types.lookupMethod(
+ classOf[BinaryStringDataUtil],
+ "toByte",
+ classOf[BinaryStringData])
+
+ val STRING_DATA_TO_FLOAT = Types.lookupMethod(
+ classOf[BinaryStringDataUtil],
+ "toFloat",
+ classOf[BinaryStringData])
+
+ val STRING_DATA_TO_DOUBLE = Types.lookupMethod(
+ classOf[BinaryStringDataUtil],
+ "toDouble",
+ classOf[BinaryStringData])
+
+ val STRING_DATA_TO_DATE = Types.lookupMethod(
+ classOf[BinaryStringDataUtil], "toDate", classOf[BinaryStringData])
+
+ val STRING_DATA_TO_TIME = Types.lookupMethod(
+ classOf[BinaryStringDataUtil], "toTime", classOf[BinaryStringData])
+
+ val STRING_DATA_TO_TIMESTAMP = Types.lookupMethod(
+ classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData])
+
+ val STRING_DATA_TO_TIMESTAMP_WITH_ZONE = Types.lookupMethod(
+ classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData], classOf[TimeZone])
+
// DecimalData functions
val DECIMAL_TO_DECIMAL = Types.lookupMethod(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 0cb0bea..8554e4f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -1094,111 +1094,6 @@ object ScalarOperatorGens {
operandTerm => s"$operandTerm.toBytes($serTerm)"
}
- // String -> Boolean
- case (VARCHAR | CHAR, BOOLEAN) =>
- val castedExpression = generateUnaryOperatorIfNotNull(
- ctx,
- targetType,
- operand,
- resultNullable = true) {
- operandTerm => s"$BINARY_STRING_UTIL.toBooleanSQL($operandTerm)"
- }
- val resultTerm = newName("primitiveCastResult")
- castedExpression.copy(
- resultTerm = resultTerm,
- code =
- s"""
- |${castedExpression.code}
- |boolean $resultTerm = Boolean.TRUE.equals(${castedExpression.resultTerm});
- |""".stripMargin
- )
-
- // String -> NUMERIC TYPE (not Character)
- case (VARCHAR | CHAR, _)
- if isNumeric(targetType) =>
- targetType match {
- case dt: DecimalType =>
- generateUnaryOperatorIfNotNull(ctx, targetType, operand) { operandTerm =>
- s"$BINARY_STRING_UTIL.toDecimal($operandTerm, ${dt.getPrecision}, ${dt.getScale})"
- }
- case _ =>
- val methodName = targetType.getTypeRoot match {
- case TINYINT => "toByte"
- case SMALLINT => "toShort"
- case INTEGER => "toInt"
- case BIGINT => "toLong"
- case DOUBLE => "toDouble"
- case FLOAT => "toFloat"
- case _ => null
- }
- assert(methodName != null, "Unexpected data type.")
- generateUnaryOperatorIfNotNull(
- ctx,
- targetType,
- operand,
- resultNullable = true) {
- operandTerm => s"($BINARY_STRING_UTIL.$methodName($operandTerm.trim()))"
- }
- }
-
- // String -> Date
- case (VARCHAR | CHAR, DATE) =>
- generateUnaryOperatorIfNotNull(
- ctx,
- targetType,
- operand,
- resultNullable = true) {
- operandTerm =>
- s"${qualifyMethod(BuiltInMethods.STRING_TO_DATE)}($operandTerm.toString())"
- }
-
- // String -> Time
- case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) =>
- generateUnaryOperatorIfNotNull(
- ctx,
- targetType,
- operand,
- resultNullable = true) {
- operandTerm =>
- s"${qualifyMethod(BuiltInMethods.STRING_TO_TIME)}($operandTerm.toString())"
- }
-
- // String -> Timestamp
- case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) =>
- generateUnaryOperatorIfNotNull(
- ctx,
- targetType,
- operand,
- resultNullable = true) {
- operandTerm =>
- s"""
- |${qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP)}($operandTerm.toString())
- """.stripMargin
- }
-
- case (VARCHAR | CHAR, TIMESTAMP_WITH_LOCAL_TIME_ZONE) =>
- generateCallWithStmtIfArgsNotNull(
- ctx, targetType, Seq(operand), resultNullable = true) { operands =>
- val zone = ctx.addReusableSessionTimeZone()
- val method = qualifyMethod(BuiltInMethods.STRING_TO_TIMESTAMP_TIME_ZONE)
- val toTimestampResultName = newName("toTimestampResult")
- // this method call might return null
- val stmt = s"Long $toTimestampResultName = $method(${operands.head}.toString(), $zone);"
- val result =
- s"""
- |($toTimestampResultName == null ?
- | null :
- | $TIMESTAMP_DATA.fromEpochMillis($toTimestampResultName))
- |""".stripMargin
- (stmt, result)
- }
-
- // String -> binary
- case (VARCHAR | CHAR, VARBINARY | BINARY) =>
- generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
- operandTerm => s"$operandTerm.toBytes()"
- }
-
// Note: SQL2003 $6.12 - casting is not allowed between boolean and numeric types.
// Calcite does not allow it either.
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 3652787..127ece8 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.functions.casting;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
@@ -80,6 +81,7 @@ import static org.apache.flink.table.api.DataTypes.TINYINT;
import static org.apache.flink.table.api.DataTypes.VARBINARY;
import static org.apache.flink.table.api.DataTypes.VARCHAR;
import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -90,9 +92,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
*/
class CastRulesTest {
+ private static final ZoneId CET = ZoneId.of("CET");
+
private static final CastRule.Context CET_CONTEXT =
- CastRule.Context.create(
- ZoneId.of("CET"), Thread.currentThread().getContextClassLoader());
+ CastRule.Context.create(CET, Thread.currentThread().getContextClassLoader());
private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5;
private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5;
@@ -125,6 +128,12 @@ class CastRulesTest {
return Stream.of(
CastTestSpecBuilder.testCastTo(TINYINT())
.fromCase(TINYINT(), null, null)
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+ .fromCase(STRING(), StringData.fromString("1.234"), (byte) 1)
+ .fromCase(STRING(), StringData.fromString("123"), (byte) 123)
+ .fail(STRING(), StringData.fromString("-130"), TableException.class)
.fromCase(
DECIMAL(4, 3),
DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -151,6 +160,12 @@ class CastRulesTest {
.fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (byte) -123),
CastTestSpecBuilder.testCastTo(SMALLINT())
.fromCase(SMALLINT(), null, null)
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+ .fromCase(STRING(), StringData.fromString("1.234"), (short) 1)
+ .fromCase(STRING(), StringData.fromString("123"), (short) 123)
+ .fail(STRING(), StringData.fromString("-32769"), TableException.class)
.fromCase(
DECIMAL(4, 3),
DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -187,6 +202,15 @@ class CastRulesTest {
.fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, (short) -123)
.fromCase(DOUBLE(), 123456.7890d, (short) -7616),
CastTestSpecBuilder.testCastTo(INT())
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+ .fromCase(STRING(), StringData.fromString("1.234"), 1)
+ .fromCase(STRING(), StringData.fromString("123"), 123)
+ .fail(
+ STRING(),
+ StringData.fromString("-3276913443134"),
+ TableException.class)
.fromCase(
DECIMAL(4, 3),
DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -229,6 +253,13 @@ class CastRulesTest {
.fromCase(INTERVAL(DAY(), SECOND()), 123L, 123),
CastTestSpecBuilder.testCastTo(BIGINT())
.fromCase(BIGINT(), null, null)
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+ .fromCase(STRING(), StringData.fromString("1.234"), 1L)
+ .fromCase(STRING(), StringData.fromString("123"), 123L)
+ .fromCase(
+ STRING(), StringData.fromString("-3276913443134"), -3276913443134L)
.fromCase(
DECIMAL(4, 3),
DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -266,6 +297,13 @@ class CastRulesTest {
.fromCase(DOUBLE(), 9234567891.12345d, 9234567891L),
CastTestSpecBuilder.testCastTo(FLOAT())
.fromCase(FLOAT(), null, null)
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+ .fromCase(STRING(), StringData.fromString("1.234"), 1.234f)
+ .fromCase(STRING(), StringData.fromString("123"), 123.0f)
+ .fromCase(
+ STRING(), StringData.fromString("-3276913443134"), -3.27691351E12f)
.fromCase(
DECIMAL(4, 3),
DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -307,6 +345,15 @@ class CastRulesTest {
.fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.23923451E12f),
CastTestSpecBuilder.testCastTo(DOUBLE())
.fromCase(DOUBLE(), null, null)
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+ .fromCase(STRING(), StringData.fromString("1.234"), 1.234d)
+ .fromCase(STRING(), StringData.fromString("123"), 123.0d)
+ .fromCase(
+ STRING(),
+ StringData.fromString("-3276913443134"),
+ -3.276913443134E12d)
.fromCase(
DECIMAL(4, 3),
DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -349,6 +396,92 @@ class CastRulesTest {
.fromCase(DOUBLE(), DEFAULT_POSITIVE_DOUBLE, DEFAULT_POSITIVE_DOUBLE)
.fromCase(DOUBLE(), DEFAULT_NEGATIVE_DOUBLE, DEFAULT_NEGATIVE_DOUBLE)
.fromCase(DOUBLE(), 1239234567891.1234567891234d, 1.2392345678911235E12d),
+ CastTestSpecBuilder.testCastTo(DATE())
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fromCase(
+ STRING(),
+ StringData.fromString("123"),
+ DateTimeUtils.localDateToUnixDate(LocalDate.of(123, 1, 1)))
+ .fromCase(
+ STRING(),
+ StringData.fromString("2021-09-27"),
+ DateTimeUtils.localDateToUnixDate(LocalDate.of(2021, 9, 27)))
+ .fromCase(
+ STRING(),
+ StringData.fromString("2021-09-27 12:34:56.123456789"),
+ DateTimeUtils.localDateToUnixDate(LocalDate.of(2021, 9, 27)))
+ .fail(STRING(), StringData.fromString("2021/09/27"), TableException.class),
+ CastTestSpecBuilder.testCastTo(TIME())
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fromCase(
+ STRING(),
+ StringData.fromString("23"),
+ DateTimeUtils.localTimeToUnixDate(LocalTime.of(23, 0, 0)))
+ .fromCase(
+ STRING(),
+ StringData.fromString("23:45"),
+ DateTimeUtils.localTimeToUnixDate(LocalTime.of(23, 45, 0)))
+ .fail(STRING(), StringData.fromString("2021-09-27"), TableException.class)
+ .fail(
+ STRING(),
+ StringData.fromString("2021-09-27 12:34:56"),
+ TableException.class)
+ .fromCase(
+ STRING(),
+ StringData.fromString("12:34:56.123456789"),
+ DateTimeUtils.localTimeToUnixDate(
+ LocalTime.of(12, 34, 56, 123_000_000)))
+ .fail(
+ STRING(),
+ StringData.fromString("2021-09-27 12:34:56.123456789"),
+ TableException.class),
+ CastTestSpecBuilder.testCastTo(TIMESTAMP(9))
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("123"), TableException.class)
+ .fromCase(
+ STRING(),
+ StringData.fromString("2021-09-27"),
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0)))
+ .fail(STRING(), StringData.fromString("2021/09/27"), TableException.class)
+ .fromCase(
+ STRING(),
+ StringData.fromString("2021-09-27 12:34:56.123456789"),
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123456789))),
+ CastTestSpecBuilder.testCastTo(TIMESTAMP_LTZ(9))
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("123"), TableException.class)
+ .fromCase(
+ STRING(),
+ CET_CONTEXT,
+ StringData.fromString("2021-09-27"),
+ TimestampData.fromInstant(
+ LocalDateTime.of(2021, 9, 27, 0, 0, 0, 0)
+ .atZone(CET)
+ .toInstant()))
+ .fromCase(
+ STRING(),
+ CET_CONTEXT,
+ StringData.fromString("2021-09-27 12:34:56.123"),
+ TimestampData.fromInstant(
+ LocalDateTime.of(2021, 9, 27, 12, 34, 56, 123000000)
+ .atZone(CET)
+ .toInstant()))
+ // https://issues.apache.org/jira/browse/FLINK-24446 Fractional seconds are
+ // lost
+ .fromCase(
+ STRING(),
+ CET_CONTEXT,
+ StringData.fromString("2021-09-27 12:34:56.123456789"),
+ TimestampData.fromInstant(
+ LocalDateTime.of(2021, 9, 27, 12, 34, 56, 0)
+ .atZone(CET)
+ .toInstant())),
CastTestSpecBuilder.testCastTo(STRING())
.fromCase(STRING(), null, null)
.fromCase(
@@ -478,7 +611,60 @@ class CastRulesTest {
RawValueData.fromObject(
LocalDateTime.parse("2020-11-11T18:08:01.123")),
StringData.fromString("2020-11-11T18:08:01.123")),
+ CastTestSpecBuilder.testCastTo(BOOLEAN())
+ .fromCase(BOOLEAN(), null, null)
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fromCase(CHAR(4), StringData.fromString("true"), true)
+ .fromCase(VARCHAR(5), StringData.fromString("FalsE"), false)
+ .fail(STRING(), StringData.fromString("Apache Flink"), TableException.class)
+ .fromCase(STRING(), StringData.fromString("TRUE"), true)
+ .fail(STRING(), StringData.fromString(""), TableException.class),
+ CastTestSpecBuilder.testCastTo(BINARY(2))
+ .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache"),
+ new byte[] {65, 112, 97, 99, 104, 101}),
+ CastTestSpecBuilder.testCastTo(VARBINARY(4))
+ .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ new byte[] {70, 108, 105, 110, 107})
+ // https://issues.apache.org/jira/browse/FLINK-24419 - not trimmed to 2
+ // bytes
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache"),
+ new byte[] {65, 112, 97, 99, 104, 101}),
+ CastTestSpecBuilder.testCastTo(BYTES())
+ .fromCase(CHAR(3), StringData.fromString("foo"), new byte[] {102, 111, 111})
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ new byte[] {70, 108, 105, 110, 107})
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache"),
+ new byte[] {65, 112, 97, 99, 104, 101}),
CastTestSpecBuilder.testCastTo(DECIMAL(5, 3))
+ .fail(CHAR(3), StringData.fromString("foo"), TableException.class)
+ .fail(VARCHAR(5), StringData.fromString("Flink"), TableException.class)
+ .fail(STRING(), StringData.fromString("Apache"), TableException.class)
+ .fromCase(
+ STRING(),
+ StringData.fromString("1.234"),
+ DecimalData.fromBigDecimal(new BigDecimal("1.234"), 5, 3))
+ .fromCase(
+ STRING(),
+ StringData.fromString("1.2"),
+ DecimalData.fromBigDecimal(new BigDecimal("1.200"), 5, 3))
.fromCase(
DECIMAL(4, 3),
DecimalData.fromBigDecimal(new BigDecimal("9.87"), 4, 3),
@@ -641,12 +827,21 @@ class CastRulesTest {
this.inputTypes.add(dataType);
this.assertionExecutors.add(
executor -> {
- assertEquals(target, executor.cast(src));
- // Run twice to make sure rules are reusable without causing issues
- assertEquals(
- target,
- executor.cast(src),
- "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+ if (target instanceof byte[]) {
+ assertArrayEquals((byte[]) target, (byte[]) executor.cast(src));
+ // Run twice to make sure rules are reusable without causing issues
+ assertArrayEquals(
+ (byte[]) target,
+ (byte[]) executor.cast(src),
+ "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+ } else {
+ assertEquals(target, executor.cast(src));
+ // Run twice to make sure rules are reusable without causing issues
+ assertEquals(
+ target,
+ executor.cast(src),
+ "Error when reusing the rule. Perhaps there is some state that needs to be reset");
+ }
});
this.descriptions.add("{" + src + " => " + target + "}");
this.castContexts.add(castContext);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
index 2aebca5..e534337 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
@@ -18,18 +18,24 @@
package org.apache.flink.table.data.binary;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.util.SegmentsUtil;
import org.apache.flink.table.runtime.util.StringUtf8Utils;
+import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.table.utils.EncodingUtils;
import java.math.BigDecimal;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.time.DateTimeException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -144,12 +150,16 @@ public class BinaryStringDataUtil {
return substrings.toArray(new BinaryStringData[0]);
}
- /** Decide boolean representation of a string. */
- public static Boolean toBooleanSQL(BinaryStringData str) {
+ /** Parse a {@link StringData} to boolean. */
+ public static boolean toBoolean(BinaryStringData str) throws TableException {
BinaryStringData lowerCase = str.toLowerCase();
- return TRUE_STRINGS.contains(lowerCase)
- ? Boolean.TRUE
- : (FALSE_STRINGS.contains(lowerCase) ? Boolean.FALSE : null);
+ if (TRUE_STRINGS.contains(lowerCase)) {
+ return true;
+ }
+ if (FALSE_STRINGS.contains(lowerCase)) {
+ return false;
+ }
+ throw new TableException("Cannot parse '" + str + "' as BOOLEAN.");
}
/** Calculate the hash value of the given bytes use {@link MessageDigest}. */
@@ -168,21 +178,30 @@ public class BinaryStringDataUtil {
}
/**
- * Parses this BinaryStringData to DecimalData.
+ * Parses a {@link BinaryStringData} to {@link DecimalData}.
*
- * @return DecimalData value if the parsing was successful, or null if overflow
- * @throws NumberFormatException if the parsing failed.
+ * @return DecimalData value if the parsing was successful.
*/
- public static DecimalData toDecimal(BinaryStringData str, int precision, int scale) {
+ public static DecimalData toDecimal(BinaryStringData str, int precision, int scale)
+ throws NumberFormatException {
str.ensureMaterialized();
+ DecimalData data;
+
if (DecimalDataUtils.isByteArrayDecimal(precision)
|| DecimalDataUtils.isByteArrayDecimal(str.getSizeInBytes())) {
- return toBigPrecisionDecimal(str, precision, scale);
+ data = toBigPrecisionDecimal(str, precision, scale);
+ } else {
+ int sizeInBytes = str.getSizeInBytes();
+ data =
+ toDecimalFromBytes(
+ precision, scale, getTmpBytes(str, sizeInBytes), 0, sizeInBytes);
}
- int sizeInBytes = str.getSizeInBytes();
- return toDecimalFromBytes(precision, scale, getTmpBytes(str, sizeInBytes), 0, sizeInBytes);
+ if (data == null) {
+ throw numberFormatExceptionFor(str, "Overflow.");
+ }
+ return data;
}
private static DecimalData toDecimalFromBytes(
@@ -353,12 +372,9 @@ public class BinaryStringDataUtil {
break;
}
}
- try {
- BigDecimal bd = new BigDecimal(chars, start, end - start);
- return DecimalData.fromBigDecimal(bd, precision, scale);
- } catch (NumberFormatException nfe) {
- return null;
- }
+
+ BigDecimal bd = new BigDecimal(chars, start, end - start);
+ return DecimalData.fromBigDecimal(bd, precision, scale);
}
}
@@ -371,14 +387,12 @@ public class BinaryStringDataUtil {
* Long.MIN_VALUE is '-9223372036854775808'.
*
* <p>This code is mostly copied from LazyLong.parseLong in Hive.
- *
- * @return Long value if the parsing was successful else null.
*/
- public static Long toLong(BinaryStringData str) {
+ public static long toLong(BinaryStringData str) throws NumberFormatException {
int sizeInBytes = str.getSizeInBytes();
byte[] tmpBytes = getTmpBytes(str, sizeInBytes);
if (sizeInBytes == 0) {
- return null;
+ throw numberFormatExceptionFor(str, "Input is empty.");
}
int i = 0;
@@ -387,7 +401,7 @@ public class BinaryStringDataUtil {
if (negative || b == '+') {
i++;
if (sizeInBytes == 1) {
- return null;
+ throw numberFormatExceptionFor(str, "Input has only positive or negative symbol.");
}
}
@@ -409,7 +423,7 @@ public class BinaryStringDataUtil {
if (b >= '0' && b <= '9') {
digit = b - '0';
} else {
- return null;
+ throw numberFormatExceptionFor(str, "Invalid character found.");
}
// We are going to process the new digit and accumulate the result. However, before
@@ -417,7 +431,7 @@ public class BinaryStringDataUtil {
// stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller
// than minValue, and we can stop.
if (result < stopValue) {
- return null;
+ throw numberFormatExceptionFor(str, "Overflow.");
}
result = result * radix - digit;
@@ -425,7 +439,7 @@ public class BinaryStringDataUtil {
// stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow.
// If result overflows, we should stop.
if (result > 0) {
- return null;
+ throw numberFormatExceptionFor(str, "Overflow.");
}
}
@@ -435,7 +449,7 @@ public class BinaryStringDataUtil {
while (i < sizeInBytes) {
byte currentByte = tmpBytes[i];
if (currentByte < '0' || currentByte > '9') {
- return null;
+ throw numberFormatExceptionFor(str, "Invalid character found.");
}
i++;
}
@@ -443,7 +457,7 @@ public class BinaryStringDataUtil {
if (!negative) {
result = -result;
if (result < 0) {
- return null;
+ throw numberFormatExceptionFor(str, "Overflow.");
}
}
return result;
@@ -461,14 +475,12 @@ public class BinaryStringDataUtil {
*
* <p>Note that, this method is almost same as `toLong`, but we leave it duplicated for
* performance reasons, like Hive does.
- *
- * @return Integer value if the parsing was successful else null.
*/
- public static Integer toInt(BinaryStringData str) {
+ public static int toInt(BinaryStringData str) throws NumberFormatException {
int sizeInBytes = str.getSizeInBytes();
byte[] tmpBytes = getTmpBytes(str, sizeInBytes);
if (sizeInBytes == 0) {
- return null;
+ throw numberFormatExceptionFor(str, "Input is empty.");
}
int i = 0;
@@ -477,7 +489,7 @@ public class BinaryStringDataUtil {
if (negative || b == '+') {
i++;
if (sizeInBytes == 1) {
- return null;
+ throw numberFormatExceptionFor(str, "Input has only positive or negative symbol.");
}
}
@@ -499,7 +511,7 @@ public class BinaryStringDataUtil {
if (b >= '0' && b <= '9') {
digit = b - '0';
} else {
- return null;
+ throw numberFormatExceptionFor(str, "Invalid character found.");
}
// We are going to process the new digit and accumulate the result. However, before
@@ -507,7 +519,7 @@ public class BinaryStringDataUtil {
// stopValue(Long.MIN_VALUE / radix), then result * 10 will definitely be smaller
// than minValue, and we can stop.
if (result < stopValue) {
- return null;
+ throw numberFormatExceptionFor(str, "Overflow.");
}
result = result * radix - digit;
@@ -515,7 +527,7 @@ public class BinaryStringDataUtil {
// stopValue(Long.MIN_VALUE / radix), we can just use `result > 0` to check overflow.
// If result overflows, we should stop.
if (result > 0) {
- return null;
+ throw numberFormatExceptionFor(str, "Overflow.");
}
}
@@ -525,7 +537,7 @@ public class BinaryStringDataUtil {
while (i < sizeInBytes) {
byte currentByte = tmpBytes[i];
if (currentByte < '0' || currentByte > '9') {
- return null;
+ throw numberFormatExceptionFor(str, "Invalid character found.");
}
i++;
}
@@ -533,48 +545,77 @@ public class BinaryStringDataUtil {
if (!negative) {
result = -result;
if (result < 0) {
- return null;
+ throw numberFormatExceptionFor(str, "Overflow.");
}
}
return result;
}
- public static Short toShort(BinaryStringData str) {
- Integer intValue = toInt(str);
- if (intValue != null) {
- short result = intValue.shortValue();
- if (result == intValue) {
- return result;
- }
+ public static short toShort(BinaryStringData str) throws NumberFormatException {
+ int intValue = toInt(str);
+ short result = (short) intValue;
+ if (result == intValue) {
+ return result;
}
- return null;
+ throw numberFormatExceptionFor(str, "Overflow.");
}
- public static Byte toByte(BinaryStringData str) {
- Integer intValue = toInt(str);
- if (intValue != null) {
- byte result = intValue.byteValue();
- if (result == intValue) {
- return result;
- }
+ public static byte toByte(BinaryStringData str) throws NumberFormatException {
+ int intValue = toInt(str);
+ byte result = (byte) intValue;
+ if (result == intValue) {
+ return result;
}
- return null;
+ throw numberFormatExceptionFor(str, "Overflow.");
}
- public static Double toDouble(BinaryStringData str) {
- try {
- return Double.valueOf(str.toString());
- } catch (NumberFormatException e) {
- return null;
+ public static double toDouble(BinaryStringData str) throws NumberFormatException {
+ return Double.parseDouble(str.toString());
+ }
+
+ public static float toFloat(BinaryStringData str) throws NumberFormatException {
+ return Float.parseFloat(str.toString());
+ }
+
+ private static NumberFormatException numberFormatExceptionFor(StringData input, String reason) {
+ return new NumberFormatException("For input string: '" + input + "'. " + reason);
+ }
+
+ public static int toDate(BinaryStringData input) throws DateTimeException {
+ Integer date = DateTimeUtils.dateStringToUnixDate(input.toString());
+ if (date == null) {
+ throw new DateTimeException("For input string: '" + input + "'.");
}
+
+ return date;
}
- public static Float toFloat(BinaryStringData str) {
- try {
- return Float.valueOf(str.toString());
- } catch (NumberFormatException e) {
- return null;
+ public static int toTime(BinaryStringData input) throws DateTimeException {
+ Integer date = DateTimeUtils.timeStringToUnixDate(input.toString());
+ if (date == null) {
+ throw new DateTimeException("For input string: '" + input + "'.");
+ }
+
+ return date;
+ }
+
+ public static TimestampData toTimestamp(BinaryStringData input) throws DateTimeException {
+ TimestampData timestamp = DateTimeUtils.toTimestampData(input.toString());
+ if (timestamp == null) {
+ throw new DateTimeException("For input string: '" + input + "'.");
}
+
+ return timestamp;
+ }
+
+ public static TimestampData toTimestamp(BinaryStringData input, TimeZone timeZone)
+ throws DateTimeException {
+ Long timestamp = DateTimeUtils.toTimestamp(input.toString(), timeZone);
+ if (timestamp == null) {
+ throw new DateTimeException("For input string: '" + input + "'.");
+ }
+
+ return TimestampData.fromEpochMillis(timestamp);
}
/**
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
index 4a8741f..bfb979d 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
@@ -60,6 +60,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
/**
@@ -420,29 +421,29 @@ public class BinaryStringDataTest {
@Test
public void testToNumeric() {
// Test to integer.
- assertEquals(Byte.valueOf("123"), toByte(fromString("123")));
- assertEquals(Byte.valueOf("123"), toByte(fromString("+123")));
- assertEquals(Byte.valueOf("-123"), toByte(fromString("-123")));
+ assertEquals(Byte.parseByte("123"), toByte(fromString("123")));
+ assertEquals(Byte.parseByte("123"), toByte(fromString("+123")));
+ assertEquals(Byte.parseByte("-123"), toByte(fromString("-123")));
- assertEquals(Short.valueOf("123"), toShort(fromString("123")));
- assertEquals(Short.valueOf("123"), toShort(fromString("+123")));
- assertEquals(Short.valueOf("-123"), toShort(fromString("-123")));
+ assertEquals(Short.parseShort("123"), toShort(fromString("123")));
+ assertEquals(Short.parseShort("123"), toShort(fromString("+123")));
+ assertEquals(Short.parseShort("-123"), toShort(fromString("-123")));
- assertEquals(Integer.valueOf("123"), toInt(fromString("123")));
- assertEquals(Integer.valueOf("123"), toInt(fromString("+123")));
- assertEquals(Integer.valueOf("-123"), toInt(fromString("-123")));
+ assertEquals(Integer.parseInt("123"), toInt(fromString("123")));
+ assertEquals(Integer.parseInt("123"), toInt(fromString("+123")));
+ assertEquals(Integer.parseInt("-123"), toInt(fromString("-123")));
- assertEquals(Long.valueOf("1234567890"), toLong(fromString("1234567890")));
- assertEquals(Long.valueOf("+1234567890"), toLong(fromString("+1234567890")));
- assertEquals(Long.valueOf("-1234567890"), toLong(fromString("-1234567890")));
+ assertEquals(Long.parseLong("1234567890"), toLong(fromString("1234567890")));
+ assertEquals(Long.parseLong("+1234567890"), toLong(fromString("+1234567890")));
+ assertEquals(Long.parseLong("-1234567890"), toLong(fromString("-1234567890")));
// Test decimal string to integer.
- assertEquals(Integer.valueOf("123"), toInt(fromString("123.456789")));
- assertEquals(Long.valueOf("123"), toLong(fromString("123.456789")));
+ assertEquals(Integer.parseInt("123"), toInt(fromString("123.456789")));
+ assertEquals(Long.parseLong("123"), toLong(fromString("123.456789")));
// Test negative cases.
- assertNull(toInt(fromString("1a3.456789")));
- assertNull(toInt(fromString("123.a56789")));
+ assertThrows(NumberFormatException.class, () -> toInt(fromString("1a3.456789")));
+ assertThrows(NumberFormatException.class, () -> toInt(fromString("123.a56789")));
// Test composite in BinaryRowData.
BinaryRowData row = new BinaryRowData(20);
@@ -453,10 +454,10 @@ public class BinaryStringDataTest {
writer.writeString(3, BinaryStringData.fromString("123456789"));
writer.complete();
- assertEquals(Byte.valueOf("1"), toByte(((BinaryStringData) row.getString(0))));
- assertEquals(Short.valueOf("123"), toShort(((BinaryStringData) row.getString(1))));
- assertEquals(Integer.valueOf("12345"), toInt(((BinaryStringData) row.getString(2))));
- assertEquals(Long.valueOf("123456789"), toLong(((BinaryStringData) row.getString(3))));
+ assertEquals(Byte.parseByte("1"), toByte(((BinaryStringData) row.getString(0))));
+ assertEquals(Short.parseShort("123"), toShort(((BinaryStringData) row.getString(1))));
+ assertEquals(Integer.parseInt("12345"), toInt(((BinaryStringData) row.getString(2))));
+ assertEquals(Long.parseLong("123456789"), toLong(((BinaryStringData) row.getString(3))));
}
@Test