You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/13 03:33:02 UTC

[flink] branch master updated: [FLINK-12457][table-planner-blink] Remove char support in blink planner

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cf911b3  [FLINK-12457][table-planner-blink] Remove char support in blink planner
cf911b3 is described below

commit cf911b34facc0b298dccf6d24348fe9a3ccd3b86
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Mon May 13 11:32:51 2019 +0800

    [FLINK-12457][table-planner-blink] Remove char support in blink planner
    
    This closes #8379
---
 .../flink/table/calcite/FlinkTypeFactory.scala     |  3 --
 .../apache/flink/table/codegen/CodeGenUtils.scala  | 11 +----
 .../table/codegen/calls/FunctionGenerator.scala    | 15 -------
 .../table/codegen/calls/ScalarOperatorGens.scala   |  4 --
 .../table/codegen/sort/SortCodeGenerator.scala     |  2 -
 .../flink/table/typeutils/TypeCheckUtils.scala     |  1 -
 .../flink/table/typeutils/TypeCoercion.scala       |  2 -
 .../flink/table/codegen/SortCodeGeneratorTest.java |  9 ----
 .../apache/flink/table/dataformat/BinaryArray.java | 34 --------------
 .../flink/table/dataformat/BinaryArrayWriter.java  |  7 ---
 .../apache/flink/table/dataformat/BinaryRow.java   | 14 ------
 .../flink/table/dataformat/BinaryRowWriter.java    |  5 ---
 .../flink/table/dataformat/BinaryWriter.java       |  4 --
 .../flink/table/dataformat/BoxedWrapperRow.java    | 16 -------
 .../table/dataformat/DataFormatConverters.java     | 46 -------------------
 .../apache/flink/table/dataformat/GenericRow.java  | 10 -----
 .../apache/flink/table/dataformat/JoinedRow.java   | 18 --------
 .../apache/flink/table/dataformat/NestedRow.java   | 13 ------
 .../flink/table/dataformat/TypeGetterSetters.java  | 12 -----
 .../flink/table/dataformat/UpdatableRow.java       | 10 -----
 .../apache/flink/table/runtime/sort/SortUtil.java  |  4 --
 .../java/org/apache/flink/table/type/CharType.java | 31 -------------
 .../apache/flink/table/type/InternalTypeUtils.java |  9 ++--
 .../org/apache/flink/table/type/InternalTypes.java |  2 -
 .../apache/flink/table/type/TypeConverters.java    |  3 --
 .../org/apache/flink/table/util/SegmentsUtil.java  | 52 ----------------------
 .../apache/flink/table/dataformat/BaseRowTest.java |  5 ---
 .../flink/table/dataformat/BinaryArrayTest.java    | 24 ----------
 .../flink/table/dataformat/BinaryRowTest.java      |  5 ---
 29 files changed, 5 insertions(+), 366 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 408f770..f8b9ebe 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -85,9 +85,6 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
 
           case InternalTypes.BINARY => createSqlType(VARBINARY)
 
-          case InternalTypes.CHAR =>
-            throw new TableException("Character type is not supported.")
-
           case decimal: DecimalType =>
             createSqlType(DECIMAL, decimal.precision(), decimal.scale())
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index 1713429..7a13af4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -34,7 +34,7 @@ import org.apache.flink.table.util.MurmurHashUtil
 import org.apache.flink.types.Row
 
 import java.lang.reflect.Method
-import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort}
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort}
 import java.util.concurrent.atomic.AtomicInteger
 
 object CodeGenUtils {
@@ -118,7 +118,6 @@ object CodeGenUtils {
     case InternalTypes.FLOAT => "float"
     case InternalTypes.DOUBLE => "double"
     case InternalTypes.BOOLEAN => "boolean"
-    case InternalTypes.CHAR => "char"
 
     case InternalTypes.DATE => "int"
     case InternalTypes.TIME => "int"
@@ -138,7 +137,6 @@ object CodeGenUtils {
     case InternalTypes.FLOAT => className[JFloat]
     case InternalTypes.DOUBLE => className[JDouble]
     case InternalTypes.BOOLEAN => className[JBoolean]
-    case InternalTypes.CHAR => className[JChar]
 
     case InternalTypes.DATE => boxedTypeTermForType(InternalTypes.INT)
     case InternalTypes.TIME => boxedTypeTermForType(InternalTypes.INT)
@@ -171,7 +169,6 @@ object CodeGenUtils {
     case InternalTypes.DOUBLE => "-1.0d"
     case InternalTypes.BOOLEAN => "false"
     case InternalTypes.STRING => s"$BINARY_STRING.EMPTY_UTF8"
-    case InternalTypes.CHAR => "'\\0'"
 
     case _: DateType | InternalTypes.TIME => "-1"
     case _: TimestampType => "-1L"
@@ -195,7 +192,6 @@ object CodeGenUtils {
     case InternalTypes.LONG => s"${className[JLong]}.hashCode($term)"
     case InternalTypes.FLOAT => s"${className[JFloat]}.hashCode($term)"
     case InternalTypes.DOUBLE => s"${className[JDouble]}.hashCode($term)"
-    case InternalTypes.CHAR => s"${className[JChar]}.hashCode($term)"
     case InternalTypes.STRING => s"$term.hashCode()"
     case InternalTypes.BINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" +
       s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
@@ -367,7 +363,6 @@ object CodeGenUtils {
       // primitive types
       case InternalTypes.BOOLEAN => s"$rowTerm.getBoolean($indexTerm)"
       case InternalTypes.BYTE => s"$rowTerm.getByte($indexTerm)"
-      case InternalTypes.CHAR => s"$rowTerm.getChar($indexTerm)"
       case InternalTypes.SHORT => s"$rowTerm.getShort($indexTerm)"
       case InternalTypes.INT => s"$rowTerm.getInt($indexTerm)"
       case InternalTypes.LONG => s"$rowTerm.getLong($indexTerm)"
@@ -500,7 +495,6 @@ object CodeGenUtils {
       case InternalTypes.FLOAT => s"$binaryRowTerm.setFloat($index, $fieldValTerm)"
       case InternalTypes.DOUBLE => s"$binaryRowTerm.setDouble($index, $fieldValTerm)"
       case InternalTypes.BOOLEAN => s"$binaryRowTerm.setBoolean($index, $fieldValTerm)"
-      case InternalTypes.CHAR =>  s"$binaryRowTerm.setChar($index, $fieldValTerm)"
       case _: DateType =>  s"$binaryRowTerm.setInt($index, $fieldValTerm)"
       case InternalTypes.TIME =>  s"$binaryRowTerm.setInt($index, $fieldValTerm)"
       case _: TimestampType =>  s"$binaryRowTerm.setLong($index, $fieldValTerm)"
@@ -526,7 +520,6 @@ object CodeGenUtils {
       case InternalTypes.FLOAT => s"$rowTerm.setFloat($indexTerm, $fieldTerm)"
       case InternalTypes.DOUBLE => s"$rowTerm.setDouble($indexTerm, $fieldTerm)"
       case InternalTypes.BOOLEAN => s"$rowTerm.setBoolean($indexTerm, $fieldTerm)"
-      case InternalTypes.CHAR =>  s"$rowTerm.setChar($indexTerm, $fieldTerm)"
       case _: DateType =>  s"$rowTerm.setInt($indexTerm, $fieldTerm)"
       case InternalTypes.TIME =>  s"$rowTerm.setInt($indexTerm, $fieldTerm)"
       case _: TimestampType =>  s"$rowTerm.setLong($indexTerm, $fieldTerm)"
@@ -541,7 +534,6 @@ object CodeGenUtils {
       elementType: InternalType): String = elementType match {
     case InternalTypes.BOOLEAN => s"$arrayTerm.setNullBoolean($index)"
     case InternalTypes.BYTE => s"$arrayTerm.setNullByte($index)"
-    case InternalTypes.CHAR => s"$arrayTerm.setNullChar($index)"
     case InternalTypes.SHORT => s"$arrayTerm.setNullShort($index)"
     case InternalTypes.INT => s"$arrayTerm.setNullInt($index)"
     case InternalTypes.LONG => s"$arrayTerm.setNullLong($index)"
@@ -593,7 +585,6 @@ object CodeGenUtils {
       case InternalTypes.STRING => s"$writerTerm.writeString($indexTerm, $fieldValTerm)"
       case d: DecimalType =>
         s"$writerTerm.writeDecimal($indexTerm, $fieldValTerm, ${d.precision()})"
-      case InternalTypes.CHAR => s"$writerTerm.writeChar($indexTerm, $fieldValTerm)"
       case _: DateType => s"$writerTerm.writeInt($indexTerm, $fieldValTerm)"
       case InternalTypes.TIME => s"$writerTerm.writeInt($indexTerm, $fieldValTerm)"
       case _: TimestampType => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index dd01fb8..3de7ab6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -649,11 +649,6 @@ object FunctionGenerator {
 
   addSqlFunction(
     PRINT,
-    Seq(InternalTypes.STRING, InternalTypes.CHAR),
-    new PrintCallGen())
-
-  addSqlFunction(
-    PRINT,
     Seq(InternalTypes.STRING, InternalTypes.DATE),
     new PrintCallGen())
 
@@ -736,11 +731,6 @@ object FunctionGenerator {
 
   addSqlFunction(
     IF,
-    Seq(InternalTypes.BOOLEAN, InternalTypes.CHAR, InternalTypes.CHAR),
-    new IfCallGen())
-
-  addSqlFunction(
-    IF,
     Seq(InternalTypes.BOOLEAN, InternalTypes.DATE, InternalTypes.DATE),
     new IfCallGen())
 
@@ -828,11 +818,6 @@ object FunctionGenerator {
 
   addSqlFunction(
     HASH_CODE,
-    Seq(InternalTypes.CHAR),
-    new HashCodeCallGen())
-
-  addSqlFunction(
-    HASH_CODE,
     Seq(InternalTypes.DATE),
     new HashCodeCallGen())
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
index efb1c76..c2603c0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
@@ -820,10 +820,6 @@ object ScalarOperatorGens {
         terms => s""" "" + ${terms.head}"""
       }
 
-    // * -> Character
-    case (_, InternalTypes.CHAR) =>
-      throw new CodeGenException("Character type not supported.")
-
     // String -> Boolean
     case (InternalTypes.STRING, InternalTypes.BOOLEAN) =>
       generateUnaryOperatorIfNotNull(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala
index 57a6421..5d963d7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala
@@ -408,7 +408,6 @@ class SortCodeGenerator(
     case InternalTypes.FLOAT => "Float"
     case InternalTypes.DOUBLE => "Double"
     case InternalTypes.BOOLEAN => "Boolean"
-    case InternalTypes.CHAR => "Char"
     case InternalTypes.STRING => "String"
     case InternalTypes.BINARY => "Binary"
     case _: DecimalType => "Decimal"
@@ -444,7 +443,6 @@ class SortCodeGenerator(
       case InternalTypes.BOOLEAN => 1
       case InternalTypes.BYTE => 1
       case InternalTypes.SHORT => 2
-      case InternalTypes.CHAR => 2
       case InternalTypes.INT => 4
       case InternalTypes.FLOAT => 4
       case InternalTypes.DOUBLE => 8
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 07b9d93..efdaed3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -91,7 +91,6 @@ object TypeCheckUtils {
          | InternalTypes.FLOAT
          | InternalTypes.DOUBLE
          | InternalTypes.BOOLEAN
-         | InternalTypes.CHAR
          | _: DateType
          | InternalTypes.TIME
          | _: TimestampType => false
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
index e559dd2..d88c23e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
@@ -86,8 +86,6 @@ object TypeCoercion {
 
     case (_, InternalTypes.STRING) => true
 
-    case (_, InternalTypes.CHAR) => false // Character type not supported.
-
     case (InternalTypes.STRING, b) if isNumeric(b) => true
     case (InternalTypes.STRING, InternalTypes.BOOLEAN) => true
     case (InternalTypes.STRING, _: DecimalType) => true
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
index cb111cf..006ab43 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
@@ -83,7 +83,6 @@ public class SortCodeGeneratorTest {
 			InternalTypes.LONG,
 			InternalTypes.FLOAT,
 			InternalTypes.DOUBLE,
-			InternalTypes.CHAR,
 			InternalTypes.STRING,
 			new DecimalType(18, 2),
 			new DecimalType(38, 18),
@@ -211,8 +210,6 @@ public class SortCodeGeneratorTest {
 				seeds[i] = rnd.nextFloat() * rnd.nextLong();
 			} else if (type.equals(InternalTypes.DOUBLE)) {
 				seeds[i] = rnd.nextDouble() * rnd.nextLong();
-			} else if (type.equals(InternalTypes.CHAR)) {
-				seeds[i] = (char) rnd.nextInt();
 			} else if (type.equals(InternalTypes.STRING)) {
 				seeds[i] = BinaryString.fromString(RandomStringUtils.random(rnd.nextInt(20)));
 			} else if (type instanceof DecimalType) {
@@ -263,8 +260,6 @@ public class SortCodeGeneratorTest {
 			return Float.MIN_VALUE;
 		} else if (type.equals(InternalTypes.DOUBLE)) {
 			return Double.MIN_VALUE;
-		} else if (type.equals(InternalTypes.CHAR)) {
-			return '1';
 		} else if (type.equals(InternalTypes.STRING)) {
 			return BinaryString.fromString("");
 		} else if (type instanceof DecimalType) {
@@ -307,8 +302,6 @@ public class SortCodeGeneratorTest {
 			return 0f;
 		} else if (type.equals(InternalTypes.DOUBLE)) {
 			return 0d;
-		} else if (type.equals(InternalTypes.CHAR)) {
-			return '0';
 		} else if (type.equals(InternalTypes.STRING)) {
 			return BinaryString.fromString("0");
 		} else if (type instanceof DecimalType) {
@@ -348,8 +341,6 @@ public class SortCodeGeneratorTest {
 			return Float.MAX_VALUE;
 		} else if (type.equals(InternalTypes.DOUBLE)) {
 			return Double.MAX_VALUE;
-		} else if (type.equals(InternalTypes.CHAR)) {
-			return '鼎';
 		} else if (type.equals(InternalTypes.STRING)) {
 			return BinaryString.fromString(RandomStringUtils.random(100));
 		} else if (type instanceof DecimalType) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index 63fa9fc..ea84acc 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -42,7 +42,6 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 	private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
 	private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class);
 	private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class);
-	private static final int CHAR_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(char[].class);
 	private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class);
 	private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class);
 	private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class);
@@ -67,8 +66,6 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 			return 4;
 		} else if (type.equals(InternalTypes.FLOAT)) {
 			return 4;
-		} else if (type.equals(InternalTypes.CHAR)) {
-			return 2;
 		} else if (type.equals(InternalTypes.DATE)) {
 			return 4;
 		} else if (type.equals(InternalTypes.TIME)) {
@@ -326,19 +323,6 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 	}
 
 	@Override
-	public char getChar(int pos) {
-		assertIndexIsValid(pos);
-		return SegmentsUtil.getChar(segments, getElementOffset(pos, 2));
-	}
-
-	@Override
-	public void setChar(int pos, char value) {
-		assertIndexIsValid(pos);
-		setNotNullAt(pos);
-		SegmentsUtil.setChar(segments, getElementOffset(pos, 2), value);
-	}
-
-	@Override
 	public void setDecimal(int pos, Decimal value, int precision) {
 		assertIndexIsValid(pos);
 
@@ -369,12 +353,6 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		}
 	}
 
-	public void setNullChar(int pos) {
-		assertIndexIsValid(pos);
-		SegmentsUtil.bitSet(segments, offset + 4, pos);
-		SegmentsUtil.setChar(segments, getElementOffset(pos, 2), '\0');
-	}
-
 	public boolean anyNull() {
 		for (int i = offset + 4; i < elementOffset; i += 4) {
 			if (SegmentsUtil.getInt(segments, i) != 0) {
@@ -414,14 +392,6 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return values;
 	}
 
-	public char[] toCharArray() {
-		checkNoNull();
-		char[] values = new char[numElements];
-		SegmentsUtil.copyToUnsafe(
-				segments, elementOffset, values, CHAR_ARRAY_OFFSET, numElements * 2);
-		return values;
-	}
-
 	public int[] toIntArray() {
 		checkNoNull();
 		int[] values = new int[numElements];
@@ -505,10 +475,6 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2);
 	}
 
-	public static BinaryArray fromPrimitiveArray(char[] arr) {
-		return fromPrimitiveArray(arr, CHAR_ARRAY_OFFSET, arr.length, 2);
-	}
-
 	public static BinaryArray fromPrimitiveArray(int[] arr) {
 		return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
index 01ea127..37d2b26 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
@@ -129,8 +129,6 @@ public final class BinaryArrayWriter extends AbstractBinaryWriter {
 			setNullInt(pos);
 		} else if (type.equals(InternalTypes.TIMESTAMP)) {
 			setNullLong(pos);
-		} else if (type.equals(InternalTypes.CHAR)) {
-			setNullShort(pos);
 		} else {
 			setNullAt(pos);
 		}
@@ -193,11 +191,6 @@ public final class BinaryArrayWriter extends AbstractBinaryWriter {
 	}
 
 	@Override
-	public void writeChar(int pos, char value) {
-		segment.putChar(getElementOffset(pos, 2), value);
-	}
-
-	@Override
 	public void afterGrow() {
 		array.pointTo(segment, 0, segment.size());
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
index e529389..99ea018 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -73,7 +73,6 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
 				InternalTypes.LONG,
 				InternalTypes.FLOAT,
 				InternalTypes.DOUBLE,
-				InternalTypes.CHAR,
 				InternalTypes.TIMESTAMP,
 				InternalTypes.DATE,
 				InternalTypes.TIME,
@@ -192,13 +191,6 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
 	}
 
 	@Override
-	public void setChar(int pos, char value) {
-		assertIndexIsValid(pos);
-		setNotNullAt(pos);
-		segments[0].putChar(getFieldOffset(pos), value);
-	}
-
-	@Override
 	public void setDecimal(int pos, Decimal value, int precision) {
 		assertIndexIsValid(pos);
 
@@ -300,12 +292,6 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
 	}
 
 	@Override
-	public char getChar(int pos) {
-		assertIndexIsValid(pos);
-		return segments[0].getChar(getFieldOffset(pos));
-	}
-
-	@Override
 	public BinaryString getString(int pos) {
 		assertIndexIsValid(pos);
 		int fieldOffset = getFieldOffset(pos);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
index 6a06b8a..8c5f656 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRowWriter.java
@@ -108,11 +108,6 @@ public final class BinaryRowWriter extends AbstractBinaryWriter {
 	}
 
 	@Override
-	public void writeChar(int pos, char value) {
-		segment.putChar(getFieldOffset(pos), value);
-	}
-
-	@Override
 	public void complete() {
 		row.setTotalSize(cursor);
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
index aafcb3f..9c6e605 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -60,8 +60,6 @@ public interface BinaryWriter {
 
 	void writeDouble(int pos, double value);
 
-	void writeChar(int pos, char value);
-
 	void writeString(int pos, BinaryString value);
 
 	void writeBinary(int pos, byte[] bytes);
@@ -98,8 +96,6 @@ public interface BinaryWriter {
 			writer.writeDouble(pos, (double) o);
 		} else if (type.equals(InternalTypes.STRING)) {
 			writer.writeString(pos, (BinaryString) o);
-		} else if (type.equals(InternalTypes.CHAR)) {
-			writer.writeChar(pos, (char) o);
 		} else if (type instanceof DateType) {
 			writer.writeInt(pos, (int) o);
 		} else if (type.equals(InternalTypes.TIME)) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BoxedWrapperRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BoxedWrapperRow.java
index 691c2f7..3462e86 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BoxedWrapperRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BoxedWrapperRow.java
@@ -19,7 +19,6 @@ package org.apache.flink.table.dataformat;
 
 import org.apache.flink.types.BooleanValue;
 import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.FloatValue;
 import org.apache.flink.types.IntValue;
@@ -71,11 +70,6 @@ public final class BoxedWrapperRow extends ObjectArrayRow {
 	}
 
 	@Override
-	public char getChar(int i) {
-		return ((CharValue) fields[i]).getValue();
-	}
-
-	@Override
 	public void setBoolean(int i, boolean value) {
 		BooleanValue wrap;
 		if ((wrap = (BooleanValue) fields[i]) == null) {
@@ -145,16 +139,6 @@ public final class BoxedWrapperRow extends ObjectArrayRow {
 		wrap.setValue(value);
 	}
 
-	@Override
-	public void setChar(int i, char value) {
-		CharValue wrap;
-		if ((wrap = (CharValue) fields[i]) == null) {
-			wrap = new CharValue();
-			fields[i] = wrap;
-		}
-		wrap.setValue(value);
-	}
-
 	public void setNonPrimitiveValue(int i, Object value) {
 		fields[i] = value;
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index ebc59bc..0d195d6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -84,7 +84,6 @@ public class DataFormatConverters {
 		t2C.put(BasicTypeInfo.DOUBLE_TYPE_INFO, DoubleConverter.INSTANCE);
 		t2C.put(BasicTypeInfo.SHORT_TYPE_INFO, ShortConverter.INSTANCE);
 		t2C.put(BasicTypeInfo.BYTE_TYPE_INFO, ByteConverter.INSTANCE);
-		t2C.put(BasicTypeInfo.CHAR_TYPE_INFO, CharConverter.INSTANCE);
 		t2C.put(BasicTypeInfo.BIG_DEC_TYPE_INFO, new BigDecimalConverter(
 				DecimalType.SYSTEM_DEFAULT.precision(),
 				DecimalType.SYSTEM_DEFAULT.scale()));
@@ -96,7 +95,6 @@ public class DataFormatConverters {
 		t2C.put(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveDoubleArrayConverter.INSTANCE);
 		t2C.put(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveShortArrayConverter.INSTANCE);
 		t2C.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveByteArrayConverter.INSTANCE);
-		t2C.put(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveCharArrayConverter.INSTANCE);
 
 		t2C.put(SqlTimeTypeInfo.DATE, DateConverter.INSTANCE);
 		t2C.put(SqlTimeTypeInfo.TIME, TimeConverter.INSTANCE);
@@ -348,23 +346,6 @@ public class DataFormatConverters {
 	}
 
 	/**
-	 * Converter for char.
-	 */
-	public static final class CharConverter extends IdentityConverter<Character> {
-
-		private static final long serialVersionUID = -7631466361315237011L;
-
-		public static final CharConverter INSTANCE = new CharConverter();
-
-		private CharConverter() {}
-
-		@Override
-		Character toExternalImpl(BaseRow row, int column) {
-			return row.getChar(column);
-		}
-	}
-
-	/**
 	 * Converter for BinaryString.
 	 */
 	public static final class BinaryStringConverter extends IdentityConverter<BinaryString> {
@@ -801,33 +782,6 @@ public class DataFormatConverters {
 	}
 
 	/**
-	 * Converter for primitive char array.
-	 */
-	public static final class PrimitiveCharArrayConverter extends DataFormatConverter<BinaryArray, char[]> {
-
-		private static final long serialVersionUID = -5438377988505771316L;
-
-		public static final PrimitiveCharArrayConverter INSTANCE = new PrimitiveCharArrayConverter();
-
-		private PrimitiveCharArrayConverter() {}
-
-		@Override
-		BinaryArray toInternalImpl(char[] value) {
-			return BinaryArray.fromPrimitiveArray(value);
-		}
-
-		@Override
-		char[] toExternalImpl(BinaryArray value) {
-			return value.toCharArray();
-		}
-
-		@Override
-		char[] toExternalImpl(BaseRow row, int column) {
-			return toExternalImpl(row.getArray(column));
-		}
-	}
-
-	/**
 	 * Converter for object array.
 	 */
 	public static final class ObjectArrayConverter<T> extends DataFormatConverter<BinaryArray, T[]> {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
index 4822595..4aa4f7d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
@@ -69,11 +69,6 @@ public final class GenericRow extends ObjectArrayRow {
 	}
 
 	@Override
-	public char getChar(int ordinal) {
-		return (char) this.fields[ordinal];
-	}
-
-	@Override
 	public void setBoolean(int ordinal, boolean value) {
 		this.fields[ordinal] = value;
 	}
@@ -108,11 +103,6 @@ public final class GenericRow extends ObjectArrayRow {
 		this.fields[ordinal] = value;
 	}
 
-	@Override
-	public void setChar(int ordinal, char value) {
-		this.fields[ordinal] = value;
-	}
-
 	public void setField(int ordinal, Object value) {
 		this.fields[ordinal] = value;
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
index 0747c7b..a92ca48 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
@@ -127,15 +127,6 @@ public final class JoinedRow implements BaseRow {
 	}
 
 	@Override
-	public char getChar(int i) {
-		if (i < row1.getArity()) {
-			return row1.getChar(i);
-		} else {
-			return row2.getChar(i - row1.getArity());
-		}
-	}
-
-	@Override
 	public Decimal getDecimal(int i, int precision, int scale) {
 		if (i < row1.getArity()) {
 			return row1.getDecimal(i, precision, scale);
@@ -271,15 +262,6 @@ public final class JoinedRow implements BaseRow {
 	}
 
 	@Override
-	public void setChar(int i, char value) {
-		if (i < row1.getArity()) {
-			row1.setChar(i, value);
-		} else {
-			row2.setChar(i - row1.getArity(), value);
-		}
-	}
-
-	@Override
 	public void setDecimal(int i, Decimal value, int precision) {
 		if (i < row1.getArity()) {
 			row1.setDecimal(i, value, precision);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index 1c8b43c..c211e52 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -109,13 +109,6 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
 	}
 
 	@Override
-	public void setChar(int pos, char value) {
-		assertIndexIsValid(pos);
-		setNotNullAt(pos);
-		SegmentsUtil.setChar(segments, getFieldOffset(pos), value);
-	}
-
-	@Override
 	public void setDecimal(int pos, Decimal value, int precision) {
 		assertIndexIsValid(pos);
 
@@ -223,12 +216,6 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
 	}
 
 	@Override
-	public char getChar(int pos) {
-		assertIndexIsValid(pos);
-		return SegmentsUtil.getChar(segments, getFieldOffset(pos));
-	}
-
-	@Override
 	public BinaryString getString(int pos) {
 		assertIndexIsValid(pos);
 		int fieldOffset = getFieldOffset(pos);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
index ab263c4..da83973 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
@@ -87,11 +87,6 @@ public interface TypeGetterSetters {
 	double getDouble(int ordinal);
 
 	/**
-	 * Get char value.
-	 */
-	char getChar(int ordinal);
-
-	/**
 	 * Get string value, internal format is BinaryString.
 	 */
 	BinaryString getString(int ordinal);
@@ -162,11 +157,6 @@ public interface TypeGetterSetters {
 	void setDouble(int ordinal, double value);
 
 	/**
-	 * Set char value.
-	 */
-	void setChar(int ordinal, char value);
-
-	/**
 	 * Set the decimal column value.
 	 *
 	 * <p>Note:
@@ -193,8 +183,6 @@ public interface TypeGetterSetters {
 			return row.getDouble(ordinal);
 		} else if (type.equals(InternalTypes.STRING)) {
 			return row.getString(ordinal);
-		} else if (type.equals(InternalTypes.CHAR)) {
-			return row.getChar(ordinal);
 		} else if (type instanceof DateType) {
 			return row.getInt(ordinal);
 		} else if (type.equals(InternalTypes.TIME)) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
index 28cb942..d385181 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
@@ -92,11 +92,6 @@ public final class UpdatableRow implements BaseRow {
 	}
 
 	@Override
-	public char getChar(int ordinal) {
-		return updated[ordinal] ? (char) fields[ordinal] : row.getChar(ordinal);
-	}
-
-	@Override
 	public byte[] getBinary(int ordinal) {
 		return updated[ordinal] ? (byte[]) fields[ordinal] : row.getBinary(ordinal);
 	}
@@ -172,11 +167,6 @@ public final class UpdatableRow implements BaseRow {
 	}
 
 	@Override
-	public void setChar(int ordinal, char value) {
-		setField(ordinal, value);
-	}
-
-	@Override
 	public void setDecimal(int ordinal, Decimal value, int precision) {
 		setField(ordinal, value);
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java
index e199efe..a56d24a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java
@@ -122,10 +122,6 @@ public class SortUtil {
 		NormalizedKeyUtil.putUnsignedLongNormalizedKey(lValue, target, offset, numBytes);
 	}
 
-	public static void putCharNormalizedKey(char value, MemorySegment target, int offset, int numBytes) {
-		NormalizedKeyUtil.putCharNormalizedKey(value, target, offset, numBytes);
-	}
-
 	public static void putBinaryNormalizedKey(
 			byte[] value, MemorySegment target, int offset, int numBytes) {
 		final int limit = offset + numBytes;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java
deleted file mode 100644
index 90aced6..0000000
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.type;
-
-/**
- * Char type.
- */
-public class CharType extends PrimitiveType {
-
-	private static final long serialVersionUID = 1L;
-
-	public static final CharType INSTANCE = new CharType();
-
-	private CharType() {}
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypeUtils.java
index 1bbf00d..ffd4168 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypeUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypeUtils.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.table.type.InternalTypes.BYTE;
-import static org.apache.flink.table.type.InternalTypes.CHAR;
 import static org.apache.flink.table.type.InternalTypes.DOUBLE;
 import static org.apache.flink.table.type.InternalTypes.FLOAT;
 import static org.apache.flink.table.type.InternalTypes.INT;
@@ -39,10 +38,10 @@ public class InternalTypeUtils {
 
 	static {
 		Map<InternalType, InternalType[]> autoCastMap = new HashMap<>();
-		autoCastMap.put(BYTE, new InternalType[]{SHORT, INT, LONG, FLOAT, DOUBLE, CHAR});
-		autoCastMap.put(SHORT, new InternalType[]{INT, LONG, FLOAT, DOUBLE, CHAR});
-		autoCastMap.put(INT, new InternalType[]{LONG, FLOAT, DOUBLE, CHAR});
-		autoCastMap.put(LONG, new InternalType[]{FLOAT, DOUBLE, CHAR});
+		autoCastMap.put(BYTE, new InternalType[]{SHORT, INT, LONG, FLOAT, DOUBLE});
+		autoCastMap.put(SHORT, new InternalType[]{INT, LONG, FLOAT, DOUBLE});
+		autoCastMap.put(INT, new InternalType[]{LONG, FLOAT, DOUBLE});
+		autoCastMap.put(LONG, new InternalType[]{FLOAT, DOUBLE});
 		autoCastMap.put(FLOAT, new InternalType[]{DOUBLE});
 		POSSIBLE_CAST_MAP = Collections.unmodifiableMap(autoCastMap);
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
index 7c51b4e..caeffff 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java
@@ -41,8 +41,6 @@ public class InternalTypes {
 
 	public static final ShortType SHORT = ShortType.INSTANCE;
 
-	public static final CharType CHAR = CharType.INSTANCE;
-
 	public static final BinaryType BINARY = BinaryType.INSTANCE;
 
 	public static final DateType DATE = DateType.DATE;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
index 8cf5ad9..3397817 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
@@ -64,7 +64,6 @@ public class TypeConverters {
 		tiToType.put(BasicTypeInfo.INT_TYPE_INFO, InternalTypes.INT);
 		tiToType.put(BasicTypeInfo.LONG_TYPE_INFO, InternalTypes.LONG);
 		tiToType.put(BasicTypeInfo.SHORT_TYPE_INFO, InternalTypes.SHORT);
-		tiToType.put(BasicTypeInfo.CHAR_TYPE_INFO, InternalTypes.CHAR);
 		tiToType.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, InternalTypes.BINARY);
 		tiToType.put(SqlTimeTypeInfo.DATE, InternalTypes.DATE);
 		tiToType.put(SqlTimeTypeInfo.TIMESTAMP, InternalTypes.TIMESTAMP);
@@ -91,7 +90,6 @@ public class TypeConverters {
 		internalTypeToInfo.put(InternalTypes.INT, BasicTypeInfo.INT_TYPE_INFO);
 		internalTypeToInfo.put(InternalTypes.LONG, BasicTypeInfo.LONG_TYPE_INFO);
 		internalTypeToInfo.put(InternalTypes.SHORT, BasicTypeInfo.SHORT_TYPE_INFO);
-		internalTypeToInfo.put(InternalTypes.CHAR, BasicTypeInfo.CHAR_TYPE_INFO);
 		internalTypeToInfo.put(InternalTypes.DATE, BasicTypeInfo.INT_TYPE_INFO);
 		internalTypeToInfo.put(InternalTypes.TIMESTAMP, BasicTypeInfo.LONG_TYPE_INFO);
 		internalTypeToInfo.put(InternalTypes.PROCTIME_INDICATOR, BasicTypeInfo.LONG_TYPE_INFO);
@@ -111,7 +109,6 @@ public class TypeConverters {
 		itToEti.put(InternalTypes.INT, BasicTypeInfo.INT_TYPE_INFO);
 		itToEti.put(InternalTypes.LONG, BasicTypeInfo.LONG_TYPE_INFO);
 		itToEti.put(InternalTypes.SHORT, BasicTypeInfo.SHORT_TYPE_INFO);
-		itToEti.put(InternalTypes.CHAR, BasicTypeInfo.CHAR_TYPE_INFO);
 		itToEti.put(InternalTypes.DATE, SqlTimeTypeInfo.DATE);
 		itToEti.put(InternalTypes.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP);
 		itToEti.put(InternalTypes.PROCTIME_INDICATOR, SqlTimeTypeInfo.TIMESTAMP);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
index 2c84b21..daf7ebd 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
@@ -947,32 +947,6 @@ public class SegmentsUtil {
 		}
 	}
 
-	/**
-	 * get char from segments.
-	 *
-	 * @param segments target segments.
-	 * @param offset value offset.
-	 */
-	public static char getChar(MemorySegment[] segments, int offset) {
-		if (inFirstSegment(segments, offset, 2)) {
-			return segments[0].getChar(offset);
-		} else {
-			return getCharMultiSegments(segments, offset);
-		}
-	}
-
-	private static char getCharMultiSegments(MemorySegment[] segments, int offset) {
-		int segSize = segments[0].size();
-		int segIndex = offset / segSize;
-		int segOffset = offset - segIndex * segSize; // equal to %
-
-		if (segOffset < segSize - 1) {
-			return segments[segIndex].getChar(segOffset);
-		} else {
-			return (char) getTwoByteSlowly(segments, segSize, segIndex, segOffset);
-		}
-	}
-
 	private static int getTwoByteSlowly(
 			MemorySegment[] segments, int segSize, int segNum, int segOffset) {
 		MemorySegment segment = segments[segNum];
@@ -993,32 +967,6 @@ public class SegmentsUtil {
 		return ret;
 	}
 
-	/**
-	 * set char from segments.
-	 *
-	 * @param segments target segments.
-	 * @param offset value offset.
-	 */
-	public static void setChar(MemorySegment[] segments, int offset, char value) {
-		if (inFirstSegment(segments, offset, 2)) {
-			segments[0].putChar(offset, value);
-		} else {
-			setCharMultiSegments(segments, offset, value);
-		}
-	}
-
-	private static void setCharMultiSegments(MemorySegment[] segments, int offset, char value) {
-		int segSize = segments[0].size();
-		int segIndex = offset / segSize;
-		int segOffset = offset - segIndex * segSize; // equal to %
-
-		if (segOffset < segSize - 3) {
-			segments[segIndex].putChar(segOffset, value);
-		} else {
-			setTwoByteSlowly(segments, segSize, segIndex, segOffset, value, value >> 8);
-		}
-	}
-
 	private static void setTwoByteSlowly(
 			MemorySegment[] segments, int segSize, int segNum, int segOffset, int b1, int b2) {
 		MemorySegment segment = segments[segNum];
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index 35dc8d6..d0c76f3 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -94,7 +94,6 @@ public class BaseRowTest {
 		writer.writeLong(4, 4);
 		writer.writeFloat(5, 5);
 		writer.writeDouble(6, 6);
-		writer.writeChar(7, (char) 7);
 		writer.writeString(8, str);
 		writer.writeGeneric(9, generic);
 		writer.writeDecimal(10, decimal1, 5);
@@ -139,7 +138,6 @@ public class BaseRowTest {
 		row.setLong(4, (long) 4);
 		row.setFloat(5, (float) 5);
 		row.setDouble(6, (double) 6);
-		row.setChar(7, (char) 7);
 		row.setNonPrimitiveValue(8, str);
 		row.setNonPrimitiveValue(9, generic);
 		row.setNonPrimitiveValue(10, decimal1);
@@ -191,7 +189,6 @@ public class BaseRowTest {
 		assertEquals(4, row.getLong(4));
 		assertEquals(5, (int) row.getFloat(5));
 		assertEquals(6, (int) row.getDouble(6));
-		assertEquals(7, row.getChar(7));
 		assertEquals(str, row.getString(8));
 		assertEquals(generic, row.getGeneric(9));
 		assertEquals(decimal1, row.getDecimal(10, 5, 0));
@@ -217,8 +214,6 @@ public class BaseRowTest {
 		assertEquals(6, (int) row.getFloat(5));
 		row.setDouble(6, 7);
 		assertEquals(7, (int) row.getDouble(6));
-		row.setChar(7, (char) 8);
-		assertEquals(8, row.getChar(7));
 		row.setDecimal(10, Decimal.fromLong(11, 5, 0), 5);
 		assertEquals(Decimal.fromLong(11, 5, 0), row.getDecimal(10, 5, 0));
 		row.setDecimal(11, Decimal.fromBigDecimal(new BigDecimal(12), 20, 0), 20);
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index beeb8c0..ff2b4fb 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -278,30 +278,6 @@ public class BinaryArrayTest {
 		}
 
 		{
-			// test char
-			BinaryArray array = new BinaryArray();
-			BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 2);
-			writer.setNullShort(0);
-			writer.writeChar(1, (char) 25);
-			writer.complete();
-
-			assertTrue(array.isNullAt(0));
-			assertEquals(25, array.getChar(1));
-			array.setChar(0, (char) 5);
-			assertEquals(5, array.getChar(0));
-			array.setNullChar(0);
-			assertTrue(array.isNullAt(0));
-
-			BinaryArray newArray = splitArray(array);
-			assertTrue(newArray.isNullAt(0));
-			assertEquals(25, newArray.getChar(1));
-			newArray.setChar(0, (char) 5);
-			assertEquals(5, newArray.getChar(0));
-			newArray.setNullChar(0);
-			assertTrue(newArray.isNullAt(0));
-		}
-
-		{
 			// test string
 			BinaryArray array = new BinaryArray();
 			BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index 15334f1..5f8b24a 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -78,7 +78,6 @@ public class BinaryRowTest {
 		row.setShort(5, (short) 55);
 		row.setByte(6, (byte) 66);
 		row.setFloat(7, 77f);
-		row.setChar(8, 'a');
 
 		assertEquals(33d, (long) row.getDouble(3), 0);
 		assertEquals(11, row.getInt(1));
@@ -88,7 +87,6 @@ public class BinaryRowTest {
 		assertEquals(true, row.getBoolean(4));
 		assertEquals((byte) 66, row.getByte(6));
 		assertEquals(77f, row.getFloat(7), 0);
-		assertEquals('a', row.getChar(8));
 	}
 
 	@Test
@@ -105,7 +103,6 @@ public class BinaryRowTest {
 
 		writer.writeBoolean(1, true);
 		writer.writeByte(2, (byte) 99);
-		writer.writeChar(4, 'x');
 		writer.writeDouble(6, 87.1d);
 		writer.writeFloat(7, 26.1f);
 		writer.writeInt(8, 88);
@@ -242,7 +239,6 @@ public class BinaryRowTest {
 		assertEquals((short) 292, row.getShort(11));
 		assertEquals(284, row.getLong(10));
 		assertEquals((byte) 99, row.getByte(2));
-		assertEquals('x', row.getChar(4));
 		assertEquals(87.1d, row.getDouble(6), 0);
 		assertEquals(26.1f, row.getFloat(7), 0);
 		assertEquals(true, row.getBoolean(1));
@@ -321,7 +317,6 @@ public class BinaryRowTest {
 			writer.writeString(9, fromString("啦啦啦啦啦我是快乐的粉刷匠"));
 			writer.writeBoolean(1, true);
 			writer.writeByte(2, (byte) 99);
-			writer.writeChar(4, 'x');
 			writer.writeDouble(6, 87.1d);
 			writer.writeFloat(7, 26.1f);
 			writer.writeInt(8, 88);