You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/03 03:51:12 UTC

[flink-table-store] branch master updated: [FLINK-30423] Improve CastExecutor and CastExecutors

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b22ed197 [FLINK-30423] Improve CastExecutor and CastExecutors
b22ed197 is described below

commit b22ed19740f427852f404d93570e3b0cff1b09ef
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jan 3 11:51:02 2023 +0800

    [FLINK-30423] Improve CastExecutor and CastExecutors
---
 .../table/store/file/casting/CastExecutor.java     |  14 +-
 .../table/store/file/casting/CastExecutors.java    | 286 +++++++++------------
 .../table/store/file/casting/CastExecutorTest.java | 151 ++++-------
 3 files changed, 167 insertions(+), 284 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutor.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutor.java
index 74c0a2f1..3dbb04cc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutor.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutor.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.table.store.file.casting;
 
-import org.apache.flink.table.api.TableException;
-
-import javax.annotation.Nonnull;
-
 /**
  * Interface to model a function that performs the casting of a value from one type to another.
  * Copied from flink.
@@ -30,11 +26,7 @@ import javax.annotation.Nonnull;
  * @param <OUT> Output internal type
  */
 public interface CastExecutor<IN, OUT> {
-    /**
-     * Cast the input value. The output is null only and only if the input is null. The method
-     * throws an exception if something goes wrong when casting.
-     *
-     * @param value Input value.
-     */
-    OUT cast(@Nonnull IN value) throws TableException;
+
+    /** Cast the input value. */
+    OUT cast(IN value);
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
index bbbddb47..d46c68fc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/casting/CastExecutors.java
@@ -45,6 +45,7 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
 
 /** Cast executors for input type and output type. */
 public class CastExecutors {
+
     /**
      * Resolve a {@link CastExecutor} for the provided input type and target type. Returns null if
      * no rule can be resolved.
@@ -65,209 +66,162 @@ public class CastExecutors {
                 {
                     switch (outputType.getTypeRoot()) {
                         case TINYINT:
-                            {
-                                return value -> ((Number) value).byteValue();
-                            }
+                            return value -> ((Number) value).byteValue();
                         case SMALLINT:
-                            {
-                                return value -> ((Number) value).shortValue();
-                            }
+                            return value -> ((Number) value).shortValue();
                         case INTEGER:
-                            {
-                                return value -> ((Number) value).intValue();
-                            }
+                            return value -> ((Number) value).intValue();
                         case BIGINT:
-                            {
-                                return value -> ((Number) value).longValue();
-                            }
+                            return value -> ((Number) value).longValue();
                         case FLOAT:
-                            {
-                                return value -> ((Number) value).floatValue();
-                            }
+                            return value -> ((Number) value).floatValue();
                         case DOUBLE:
-                            {
-                                return value -> ((Number) value).doubleValue();
-                            }
+                            return value -> ((Number) value).doubleValue();
                         case DECIMAL:
-                            {
-                                final DecimalType decimalType = (DecimalType) outputType;
-                                return value -> {
-                                    final Number number = (Number) value;
-                                    switch (inputType.getTypeRoot()) {
-                                        case TINYINT:
-                                        case SMALLINT:
-                                        case INTEGER:
-                                        case BIGINT:
-                                            {
-                                                return DecimalDataUtils.castFrom(
-                                                        number.longValue(),
-                                                        decimalType.getPrecision(),
-                                                        decimalType.getScale());
-                                            }
-                                        default:
-                                            {
-                                                return DecimalDataUtils.castFrom(
-                                                        number.doubleValue(),
-                                                        decimalType.getPrecision(),
-                                                        decimalType.getScale());
-                                            }
-                                    }
-                                };
-                            }
+                            final DecimalType decimalType = (DecimalType) outputType;
+                            return value -> {
+                                final Number number = (Number) value;
+                                switch (inputType.getTypeRoot()) {
+                                    case TINYINT:
+                                    case SMALLINT:
+                                    case INTEGER:
+                                    case BIGINT:
+                                        {
+                                            return DecimalDataUtils.castFrom(
+                                                    number.longValue(),
+                                                    decimalType.getPrecision(),
+                                                    decimalType.getScale());
+                                        }
+                                    default:
+                                        {
+                                            return DecimalDataUtils.castFrom(
+                                                    number.doubleValue(),
+                                                    decimalType.getPrecision(),
+                                                    decimalType.getScale());
+                                        }
+                                }
+                            };
                         default:
-                            {
-                                return null;
-                            }
+                            return null;
                     }
                 }
             case DECIMAL:
                 {
                     switch (outputType.getTypeRoot()) {
                         case TINYINT:
-                            {
-                                return value ->
-                                        (byte) DecimalDataUtils.castToIntegral((DecimalData) value);
-                            }
+                            return value ->
+                                    (byte) DecimalDataUtils.castToIntegral((DecimalData) value);
                         case SMALLINT:
-                            {
-                                return value ->
-                                        (short)
-                                                DecimalDataUtils.castToIntegral(
-                                                        (DecimalData) value);
-                            }
+                            return value ->
+                                    (short) DecimalDataUtils.castToIntegral((DecimalData) value);
                         case INTEGER:
-                            {
-                                return value ->
-                                        (int) DecimalDataUtils.castToIntegral((DecimalData) value);
-                            }
+                            return value ->
+                                    (int) DecimalDataUtils.castToIntegral((DecimalData) value);
                         case BIGINT:
-                            {
-                                return value ->
-                                        DecimalDataUtils.castToIntegral((DecimalData) value);
-                            }
+                            return value -> DecimalDataUtils.castToIntegral((DecimalData) value);
                         case FLOAT:
-                            {
-                                return value ->
-                                        (float) DecimalDataUtils.doubleValue((DecimalData) value);
-                            }
+                            return value ->
+                                    (float) DecimalDataUtils.doubleValue((DecimalData) value);
                         case DOUBLE:
-                            {
-                                return value -> DecimalDataUtils.doubleValue((DecimalData) value);
-                            }
+                            return value -> DecimalDataUtils.doubleValue((DecimalData) value);
                         case DECIMAL:
-                            {
-                                DecimalType decimalType = (DecimalType) outputType;
-                                return value ->
-                                        DecimalDataUtils.castToDecimal(
-                                                (DecimalData) value,
-                                                decimalType.getPrecision(),
-                                                decimalType.getScale());
-                            }
+                            DecimalType decimalType = (DecimalType) outputType;
+                            return value ->
+                                    DecimalDataUtils.castToDecimal(
+                                            (DecimalData) value,
+                                            decimalType.getPrecision(),
+                                            decimalType.getScale());
                         default:
-                            {
-                                return null;
-                            }
+                            return null;
                     }
                 }
             case CHAR:
             case VARCHAR:
-                {
-                    if (outputType.getTypeRoot() == CHAR || outputType.getTypeRoot() == VARCHAR) {
-                        final boolean targetCharType = outputType.getTypeRoot() == CHAR;
-                        final int targetLength = getStringLength(outputType);
-                        return value -> {
-                            StringData result;
-                            String strVal = value.toString();
-                            BinaryStringData strData = BinaryStringData.fromString(strVal);
-                            if (strData.numChars() > targetLength) {
-                                result =
-                                        BinaryStringData.fromString(
-                                                strVal.substring(0, targetLength));
-                            } else {
-                                if (strData.numChars() < targetLength) {
-                                    if (targetCharType) {
-                                        int padLength = targetLength - strData.numChars();
-                                        BinaryStringData padString =
-                                                BinaryStringData.blankString(padLength);
-                                        result = BinaryStringDataUtil.concat(strData, padString);
-                                    } else {
-                                        result = strData;
-                                    }
+                if (outputType.getTypeRoot() == CHAR || outputType.getTypeRoot() == VARCHAR) {
+                    final boolean targetCharType = outputType.getTypeRoot() == CHAR;
+                    final int targetLength = getStringLength(outputType);
+                    return value -> {
+                        StringData result;
+                        String strVal = value.toString();
+                        BinaryStringData strData = BinaryStringData.fromString(strVal);
+                        if (strData.numChars() > targetLength) {
+                            result = BinaryStringData.fromString(strVal.substring(0, targetLength));
+                        } else {
+                            if (strData.numChars() < targetLength) {
+                                if (targetCharType) {
+                                    int padLength = targetLength - strData.numChars();
+                                    BinaryStringData padString =
+                                            BinaryStringData.blankString(padLength);
+                                    result = BinaryStringDataUtil.concat(strData, padString);
                                 } else {
                                     result = strData;
                                 }
-                            }
-
-                            return result;
-                        };
-                    } else if (outputType.getTypeRoot() == VARBINARY) {
-                        final int targetLength = getBinaryLength(outputType);
-                        return value -> {
-                            byte[] byteArrayTerm = ((StringData) value).toBytes();
-                            if (byteArrayTerm.length <= targetLength) {
-                                return byteArrayTerm;
                             } else {
-                                return Arrays.copyOf(byteArrayTerm, targetLength);
+                                result = strData;
                             }
-                        };
-                    }
-                    return null;
+                        }
+
+                        return result;
+                    };
+                } else if (outputType.getTypeRoot() == VARBINARY) {
+                    final int targetLength = getBinaryLength(outputType);
+                    return value -> {
+                        byte[] byteArrayTerm = ((StringData) value).toBytes();
+                        if (byteArrayTerm.length <= targetLength) {
+                            return byteArrayTerm;
+                        } else {
+                            return Arrays.copyOf(byteArrayTerm, targetLength);
+                        }
+                    };
                 }
+                return null;
             case BINARY:
-                {
-                    if (outputType.getTypeRoot() == BINARY) {
-                        final int targetLength = getBinaryLength(outputType);
-                        return value ->
-                                ((((byte[]) value).length == targetLength)
-                                        ? value
-                                        : Arrays.copyOf((byte[]) value, targetLength));
-                    }
-                    return null;
+                if (outputType.getTypeRoot() == BINARY) {
+                    final int targetLength = getBinaryLength(outputType);
+                    return value ->
+                            ((((byte[]) value).length == targetLength)
+                                    ? value
+                                    : Arrays.copyOf((byte[]) value, targetLength));
                 }
+                return null;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                {
-                    switch (outputType.getTypeRoot()) {
-                        case DATE:
-                            {
-                                return value ->
-                                        (int)
-                                                (((TimestampData) value).getMillisecond()
-                                                        / DateTimeUtils.MILLIS_PER_DAY);
-                            }
-                        case TIMESTAMP_WITHOUT_TIME_ZONE:
-                            {
-                                return value ->
-                                        DateTimeUtils.truncate(
-                                                (TimestampData) value,
-                                                ((TimestampType) outputType).getPrecision());
-                            }
-                        case TIME_WITHOUT_TIME_ZONE:
-                            {
-                                return value ->
-                                        (int)
-                                                (((TimestampData) value).getMillisecond()
-                                                        % DateTimeUtils.MILLIS_PER_DAY);
-                            }
-                        default:
-                            {
-                                return null;
-                            }
-                    }
+                switch (outputType.getTypeRoot()) {
+                    case DATE:
+                        {
+                            return value ->
+                                    (int)
+                                            (((TimestampData) value).getMillisecond()
+                                                    / DateTimeUtils.MILLIS_PER_DAY);
+                        }
+                    case TIMESTAMP_WITHOUT_TIME_ZONE:
+                        {
+                            return value ->
+                                    DateTimeUtils.truncate(
+                                            (TimestampData) value,
+                                            ((TimestampType) outputType).getPrecision());
+                        }
+                    case TIME_WITHOUT_TIME_ZONE:
+                        {
+                            return value ->
+                                    (int)
+                                            (((TimestampData) value).getMillisecond()
+                                                    % DateTimeUtils.MILLIS_PER_DAY);
+                        }
+                    default:
+                        {
+                            return null;
+                        }
                 }
             case TIME_WITHOUT_TIME_ZONE:
-                {
-                    if (outputType.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE) {
-                        return value ->
-                                (int)
-                                        (((TimestampData) value).getMillisecond()
-                                                % DateTimeUtils.MILLIS_PER_DAY);
-                    }
-                    return null;
+                if (outputType.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE) {
+                    return value ->
+                            (int)
+                                    (((TimestampData) value).getMillisecond()
+                                            % DateTimeUtils.MILLIS_PER_DAY);
                 }
+                return null;
             default:
-                {
-                    return null;
-                }
+                return null;
         }
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
index a5afc7d3..ea5ffeeb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/casting/CastExecutorTest.java
@@ -43,124 +43,79 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link CastExecutor}. */
 public class CastExecutorTest {
+
     @Test
     public void testNumericToNumeric() {
         // byte to other numeric
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TinyIntType(false), new SmallIntType(false)),
+                CastExecutors.resolve(new TinyIntType(false), new SmallIntType(false)),
                 (byte) 1,
                 (short) 1);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TinyIntType(false), new IntType(false)),
-                (byte) 1,
-                1);
+                CastExecutors.resolve(new TinyIntType(false), new IntType(false)), (byte) 1, 1);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TinyIntType(false), new BigIntType(false)),
-                (byte) 1,
-                1L);
+                CastExecutors.resolve(new TinyIntType(false), new BigIntType(false)), (byte) 1, 1L);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TinyIntType(false), new FloatType(false)),
-                (byte) 1,
-                1F);
+                CastExecutors.resolve(new TinyIntType(false), new FloatType(false)), (byte) 1, 1F);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TinyIntType(false), new DoubleType(false)),
-                (byte) 1,
-                1D);
+                CastExecutors.resolve(new TinyIntType(false), new DoubleType(false)), (byte) 1, 1D);
 
         // short to other numeric
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new SmallIntType(false), new IntType(false)),
-                (short) 1,
-                1);
+                CastExecutors.resolve(new SmallIntType(false), new IntType(false)), (short) 1, 1);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new SmallIntType(false), new BigIntType(false)),
+                CastExecutors.resolve(new SmallIntType(false), new BigIntType(false)),
                 (short) 1,
                 1L);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new SmallIntType(false), new FloatType(false)),
+                CastExecutors.resolve(new SmallIntType(false), new FloatType(false)),
                 (short) 1,
                 1F);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new SmallIntType(false), new DoubleType(false)),
+                CastExecutors.resolve(new SmallIntType(false), new DoubleType(false)),
                 (short) 1,
                 1D);
 
         // int to other numeric
-        compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new IntType(false), new BigIntType(false)),
-                1,
-                1L);
-        compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new IntType(false), new FloatType(false)),
-                1,
-                1F);
-        compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new IntType(false), new DoubleType(false)),
-                1,
-                1D);
+        compareCastResult(CastExecutors.resolve(new IntType(false), new BigIntType(false)), 1, 1L);
+        compareCastResult(CastExecutors.resolve(new IntType(false), new FloatType(false)), 1, 1F);
+        compareCastResult(CastExecutors.resolve(new IntType(false), new DoubleType(false)), 1, 1D);
 
         // bigint to other numeric
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new BigIntType(false), new FloatType(false)),
-                1L,
-                1F);
+                CastExecutors.resolve(new BigIntType(false), new FloatType(false)), 1L, 1F);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new BigIntType(false), new DoubleType(false)),
-                1L,
-                1D);
+                CastExecutors.resolve(new BigIntType(false), new DoubleType(false)), 1L, 1D);
 
         // float to double
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new FloatType(false), new DoubleType(false)),
-                1F,
-                1D);
+                CastExecutors.resolve(new FloatType(false), new DoubleType(false)), 1F, 1D);
     }
 
     @Test
     public void testNumericToDecimal() {
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TinyIntType(false), new DecimalType(10, 2)),
+                CastExecutors.resolve(new TinyIntType(false), new DecimalType(10, 2)),
                 (byte) 1,
                 DecimalDataUtils.castFrom(1, 10, 2));
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new SmallIntType(false), new DecimalType(10, 2)),
+                CastExecutors.resolve(new SmallIntType(false), new DecimalType(10, 2)),
                 (short) 1,
                 DecimalDataUtils.castFrom(1, 10, 2));
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new IntType(false), new DecimalType(10, 2)),
+                CastExecutors.resolve(new IntType(false), new DecimalType(10, 2)),
                 1,
                 DecimalDataUtils.castFrom(1, 10, 2));
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new BigIntType(false), new DecimalType(10, 2)),
+                CastExecutors.resolve(new BigIntType(false), new DecimalType(10, 2)),
                 1L,
                 DecimalDataUtils.castFrom(1, 10, 2));
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new FloatType(false), new DecimalType(10, 2)),
+                CastExecutors.resolve(new FloatType(false), new DecimalType(10, 2)),
                 1.23456F,
                 DecimalDataUtils.castFrom(1.23456D, 10, 2));
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new DoubleType(false), new DecimalType(10, 2)),
+                CastExecutors.resolve(new DoubleType(false), new DecimalType(10, 2)),
                 1.23456D,
                 DecimalDataUtils.castFrom(1.23456D, 10, 2));
     }
@@ -168,13 +123,11 @@ public class CastExecutorTest {
     @Test
     public void testDecimalToDecimal() {
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new DecimalType(10, 4), new DecimalType(10, 2)),
+                CastExecutors.resolve(new DecimalType(10, 4), new DecimalType(10, 2)),
                 DecimalDataUtils.castFrom(1.23456D, 10, 4),
                 DecimalDataUtils.castFrom(1.23456D, 10, 2));
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new DecimalType(10, 2), new DecimalType(10, 4)),
+                CastExecutors.resolve(new DecimalType(10, 2), new DecimalType(10, 4)),
                 DecimalDataUtils.castFrom(1.23456D, 10, 2),
                 DecimalDataUtils.castFrom(1.2300D, 10, 4));
     }
@@ -182,13 +135,11 @@ public class CastExecutorTest {
     @Test
     public void testDecimalToNumeric() {
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new DecimalType(10, 4), new FloatType(false)),
+                CastExecutors.resolve(new DecimalType(10, 4), new FloatType(false)),
                 DecimalDataUtils.castFrom(1.23456D, 10, 4),
                 1.2346F);
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new DecimalType(10, 2), new DoubleType(false)),
+                CastExecutors.resolve(new DecimalType(10, 2), new DoubleType(false)),
                 DecimalDataUtils.castFrom(1.23456D, 10, 2),
                 1.23D);
     }
@@ -197,57 +148,49 @@ public class CastExecutorTest {
     public void testStringToString() {
         // varchar(10) to varchar(5)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new VarCharType(10), new VarCharType(5)),
+                CastExecutors.resolve(new VarCharType(10), new VarCharType(5)),
                 StringData.fromString("1234567890"),
                 StringData.fromString("12345"));
 
         // varchar(10) to varchar(20)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new VarCharType(10), new VarCharType(20)),
+                CastExecutors.resolve(new VarCharType(10), new VarCharType(20)),
                 StringData.fromString("1234567890"),
                 StringData.fromString("1234567890"));
 
         // varchar(10) to char(5)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new VarCharType(10), new CharType(5)),
+                CastExecutors.resolve(new VarCharType(10), new CharType(5)),
                 StringData.fromString("1234567890"),
                 StringData.fromString("12345"));
 
         // varchar(10) to char(20)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new VarCharType(10), new CharType(20)),
+                CastExecutors.resolve(new VarCharType(10), new CharType(20)),
                 StringData.fromString("1234567890"),
                 StringData.fromString("1234567890          "));
 
         // char(10) to varchar(5)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new CharType(10), new VarCharType(5)),
+                CastExecutors.resolve(new CharType(10), new VarCharType(5)),
                 StringData.fromString("1234567890"),
                 StringData.fromString("12345"));
 
         // char(10) to varchar(20)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new CharType(10), new VarCharType(20)),
+                CastExecutors.resolve(new CharType(10), new VarCharType(20)),
                 StringData.fromString("12345678  "),
                 StringData.fromString("12345678  "));
 
         // char(10) to char(5)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new CharType(10), new CharType(5)),
+                CastExecutors.resolve(new CharType(10), new CharType(5)),
                 StringData.fromString("12345678  "),
                 StringData.fromString("12345"));
 
         // char(10) to char(20)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new CharType(10), new CharType(20)),
+                CastExecutors.resolve(new CharType(10), new CharType(20)),
                 StringData.fromString("12345678  "),
                 StringData.fromString("12345678            "));
     }
@@ -256,15 +199,13 @@ public class CastExecutorTest {
     public void testStringToBinary() {
         // string(10) to binary(5)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new VarCharType(10), new VarBinaryType(5)),
+                CastExecutors.resolve(new VarCharType(10), new VarBinaryType(5)),
                 StringData.fromString("12345678"),
                 "12345".getBytes());
 
         // string(10) to binary(20)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new VarCharType(10), new VarBinaryType(20)),
+                CastExecutors.resolve(new VarCharType(10), new VarBinaryType(20)),
                 StringData.fromString("12345678"),
                 "12345678".getBytes());
     }
@@ -273,15 +214,13 @@ public class CastExecutorTest {
     public void testBinaryToBinary() {
         // binary(10) to binary(5)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new BinaryType(10), new BinaryType(5)),
+                CastExecutors.resolve(new BinaryType(10), new BinaryType(5)),
                 "1234567890".getBytes(),
                 "12345".getBytes());
 
         // binary(10) to binary(20)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new BinaryType(10), new BinaryType(20)),
+                CastExecutors.resolve(new BinaryType(10), new BinaryType(20)),
                 "12345678".getBytes(),
                 new byte[] {49, 50, 51, 52, 53, 54, 55, 56, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
     }
@@ -293,27 +232,25 @@ public class CastExecutorTest {
 
         // timestamp(5) to timestamp(2)
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TimestampType(5), new TimestampType(2)),
+                CastExecutors.resolve(new TimestampType(5), new TimestampType(2)),
                 timestampData,
                 DateTimeUtils.truncate(TimestampData.fromEpochMillis(mills), 2));
 
         // timestamp to date
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TimestampType(5), new DateType()),
+                CastExecutors.resolve(new TimestampType(5), new DateType()),
                 TimestampData.fromEpochMillis(mills),
                 (int) (mills / DateTimeUtils.MILLIS_PER_DAY));
 
         // timestamp to time
         compareCastResult(
-                (CastExecutor<Object, Object>)
-                        CastExecutors.resolve(new TimestampType(5), new TimeType(2)),
+                CastExecutors.resolve(new TimestampType(5), new TimeType(2)),
                 TimestampData.fromEpochMillis(mills),
                 (int) (mills % DateTimeUtils.MILLIS_PER_DAY));
     }
 
-    private void compareCastResult(CastExecutor<Object, Object> cast, Object input, Object output) {
-        assertThat(cast.cast(input)).isEqualTo(output);
+    @SuppressWarnings("rawtypes")
+    private void compareCastResult(CastExecutor<?, ?> cast, Object input, Object output) {
+        assertThat(((CastExecutor) cast).cast(input)).isEqualTo(output);
     }
 }