You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/23 13:51:38 UTC

[flink] branch master updated: [FLINK-13266][table] Port DataView related classes to flink-table-common

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d1d2256  [FLINK-13266][table] Port DataView related classes to flink-table-common
d1d2256 is described below

commit d1d2256e22742511faabb6e75c80af75a0a8a58b
Author: godfreyhe <go...@163.com>
AuthorDate: Thu Jul 18 15:10:00 2019 +0800

    [FLINK-13266][table] Port DataView related classes to flink-table-common
---
 .../apache/flink/table/api/dataview/DataView.java  |   3 +
 .../apache/flink/table/api/dataview/ListView.java  |  10 +-
 .../apache/flink/table/api/dataview/MapView.java   |  10 +-
 .../flink/table/dataview}/ListViewSerializer.java  |  60 ++++++-
 .../table/dataview/ListViewSerializerSnapshot.java |   2 +
 .../flink/table/dataview}/ListViewTypeInfo.java    |   4 +-
 .../table/dataview}/ListViewTypeInfoFactory.java   |   4 +-
 .../flink/table/dataview}/MapViewSerializer.java   |  63 ++++++-
 .../table/dataview/MapViewSerializerSnapshot.java  |   2 +
 .../flink/table/dataview}/MapViewTypeInfo.java     |   4 +-
 .../table/dataview}/MapViewTypeInfoFactory.java    |   4 +-
 .../table/dataview}/NullAwareMapSerializer.java    |   4 +-
 .../flink/table/dataview}/NullSerializer.java      |   4 +-
 .../FirstValueWithRetractAggFunction.java          |   4 +-
 .../LastValueWithRetractAggFunction.java           |   4 +-
 .../apache/flink/table/dataview/DataViewSpec.scala |   1 -
 .../flink/table/plan/util/AggregateUtil.scala      |   9 +-
 .../apache/flink/table/api/dataview/DataView.scala |  35 ----
 .../flink/table/api/dataview/DataViewSpec.scala    |   5 +-
 .../apache/flink/table/api/dataview/ListView.scala | 142 ---------------
 .../apache/flink/table/api/dataview/MapView.scala  | 198 ---------------------
 .../flink/table/dataview/ListViewSerializer.scala  | 117 ------------
 .../flink/table/dataview/ListViewTypeInfo.scala    |  66 -------
 .../table/dataview/ListViewTypeInfoFactory.scala   |  43 -----
 .../flink/table/dataview/MapViewSerializer.scala   | 123 -------------
 .../flink/table/dataview/MapViewTypeInfo.scala     |  72 --------
 .../table/dataview/MapViewTypeInfoFactory.scala    |  51 ------
 .../aggfunctions/CollectAggFunction.scala          |   1 -
 .../functions/utils/UserDefinedFunctionUtils.scala |  30 ++--
 .../table/dataview/ListViewSerializerTest.scala    |   1 -
 .../table/dataview/MapViewSerializerTest.scala     |   1 -
 .../table/dataview/PerKeyStateDataViewStore.java   |   2 -
 .../dataview/PerWindowStateDataViewStore.java      |   2 -
 .../flink/table/dataview/StateDataViewStore.java   |   2 -
 .../table/types/TypeInfoDataTypeConverter.java     |   2 +-
 35 files changed, 171 insertions(+), 914 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
similarity index 95%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
index 487ef79..8fe6aaf 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.api.dataview;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.io.Serializable;
 
 /**
@@ -27,6 +29,7 @@ import java.io.Serializable;
  * <p>Depending on the context in which the {@code AggregateFunction} is
  * used, a {@link DataView} can be backed by a Java heap collection or a state backend.
  */
+@PublicEvolving
 public interface DataView extends Serializable {
 
 	/**
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
similarity index 95%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
index 5f34e72..e78b09e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.table.api.dataview;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.typeutils.ListViewTypeInfoFactory;
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -75,6 +76,7 @@ import java.util.Objects;
  * }</pre>
  */
 @TypeInfo(ListViewTypeInfoFactory.class)
+@PublicEvolving
 public class ListView<T> implements DataView {
 	private static final long serialVersionUID = 5502298766901215388L;
 
@@ -106,7 +108,7 @@ public class ListView<T> implements DataView {
 	 * Returns an iterable of the list view.
 	 *
 	 * @throws Exception Thrown if the system cannot get data.
-	 * @return The iterable of the list or { @code null} if the list is empty.
+	 * @return The iterable of the list.
 	 */
 	public Iterable<T> get() throws Exception {
 		return list;
@@ -167,8 +169,4 @@ public class ListView<T> implements DataView {
 		return Objects.hash(elementType, list);
 	}
 
-	@Override
-	public String toString() {
-		return "ListView" + list.toString();
-	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
similarity index 96%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
index aa476be..7feb07b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.table.api.dataview;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.typeutils.MapViewTypeInfoFactory;
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory;
 
 import java.util.HashMap;
 import java.util.Iterator;
@@ -76,6 +77,7 @@ import java.util.Map;
  *
  */
 @TypeInfo(MapViewTypeInfoFactory.class)
+@PublicEvolving
 public class MapView<K, V> implements DataView {
 
 	private static final long serialVersionUID = -6185595470714822744L;
@@ -108,7 +110,7 @@ public class MapView<K, V> implements DataView {
 	}
 
 	/**
-	 * Return the value for the specified key or { @code null } if the key is not in the map view.
+	 * Return the value for the specified key or {@code null} if the key is not in the map view.
 	 *
 	 * @param key The look up key.
 	 * @return The value for the specified key.
@@ -223,8 +225,4 @@ public class MapView<K, V> implements DataView {
 		return map.hashCode();
 	}
 
-	@Override
-	public String toString() {
-		return "MapView" + map.toString();
-	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java
similarity index 54%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java
index 9eb8af2..b4a3445 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java
@@ -16,10 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.LegacySerializerSnapshotTransformer;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.ListView;
@@ -36,9 +41,12 @@ import java.util.List;
  *
  * @param <T> The type of element in the list.
  */
-public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> {
+@Internal
+public class ListViewSerializer<T>
+		extends TypeSerializer<ListView<T>>
+		implements LegacySerializerSnapshotTransformer<ListView<T>> {
 
-	private static final long serialVersionUID = 3272986300876096397L;
+	private static final long serialVersionUID = -2030398712359267867L;
 
 	private final TypeSerializer<List<T>> listSerializer;
 
@@ -63,7 +71,7 @@ public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> {
 
 	@Override
 	public ListView<T> copy(ListView<T> from) {
-		return new ListView<>(null, from.list);
+		return new ListView<>(null, listSerializer.copy(from.list));
 	}
 
 	@Override
@@ -113,6 +121,48 @@ public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> {
 
 	@Override
 	public TypeSerializerSnapshot<ListView<T>> snapshotConfiguration() {
-		throw new UnsupportedOperationException();
+		return new ListViewSerializerSnapshot<>(this);
 	}
+
+	/**
+	 * We need to override this as a {@link LegacySerializerSnapshotTransformer}
+	 * because in Flink 1.6.x and below, this serializer was incorrectly returning
+	 * directly the snapshot of the nested list serializer as its own snapshot.
+	 *
+	 * <p>This method transforms the incorrect list serializer snapshot
+	 * to be a proper {@link ListViewSerializerSnapshot}.
+	 */
+	@Override
+	public <U> TypeSerializerSnapshot<ListView<T>> transformLegacySerializerSnapshot(
+			TypeSerializerSnapshot<U> legacySnapshot) {
+		if (legacySnapshot instanceof ListViewSerializerSnapshot) {
+			return (TypeSerializerSnapshot<ListView<T>>) legacySnapshot;
+		} else if (legacySnapshot instanceof CollectionSerializerConfigSnapshot) {
+			// first, transform the incorrect list serializer's snapshot
+			// into a proper ListSerializerSnapshot
+			ListSerializerSnapshot<T> transformedNestedListSerializerSnapshot = new ListSerializerSnapshot<>();
+			CollectionSerializerConfigSnapshot<List<T>, T> snapshot =
+					(CollectionSerializerConfigSnapshot<List<T>, T>) legacySnapshot;
+			CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+					transformedNestedListSerializerSnapshot,
+					(TypeSerializerSnapshot<?>) (snapshot.getSingleNestedSerializerAndConfig().f1));
+
+			// then, wrap the transformed ListSerializerSnapshot
+			// as a nested snapshot in the final resulting ListViewSerializerSnapshot
+			ListViewSerializerSnapshot<T> transformedListViewSerializerSnapshot = new ListViewSerializerSnapshot<>();
+			CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+					transformedListViewSerializerSnapshot,
+					transformedNestedListSerializerSnapshot);
+
+			return transformedListViewSerializerSnapshot;
+		} else {
+			throw new UnsupportedOperationException(
+					legacySnapshot.getClass().getCanonicalName() + " is not supported.");
+		}
+	}
+
+	public TypeSerializer<List<T>> getListSerializer() {
+		return listSerializer;
+	}
+
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index a130271..72547fd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -30,6 +31,7 @@ import java.util.List;
  *
  * @param <T> the type of the list elements.
  */
+@Internal
 public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer<T>> {
 
 	private static final int CURRENT_VERSION = 1;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java
similarity index 97%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java
index f0c71a4..951b140 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -29,6 +30,7 @@ import org.apache.flink.table.api.dataview.ListView;
  *
  * @param <T> element type
  */
+@Internal
 public class ListViewTypeInfo<T> extends TypeInformation<ListView<T>> {
 
 	private static final long serialVersionUID = 6468505781419989441L;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java
similarity index 94%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java
index 7dc9227..9aef459 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
@@ -29,6 +30,7 @@ import java.util.Map;
 /**
  * TypeInformation factory for {@link ListView}.
  */
+@Internal
 public class ListViewTypeInfoFactory<T> extends TypeInfoFactory<ListView<T>> {
 
 	@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java
similarity index 55%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java
index 3f697d6..b34b698 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java
@@ -16,10 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.LegacySerializerSnapshotTransformer;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.MapView;
@@ -38,8 +43,12 @@ import java.util.Map;
  * @param <K> The type of the keys in the map.
  * @param <V> The type of the values in the map.
  */
-public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> {
-	private static final long serialVersionUID = 202624224378185203L;
+@Internal
+public class MapViewSerializer<K, V>
+		extends TypeSerializer<MapView<K, V>>
+		implements LegacySerializerSnapshotTransformer<MapView<K, V>> {
+
+	private static final long serialVersionUID = -9007142882049098705L;
 
 	private final TypeSerializer<Map<K, V>> mapSerializer;
 
@@ -54,7 +63,7 @@ public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> {
 
 	@Override
 	public TypeSerializer<MapView<K, V>> duplicate() {
-		return new MapViewSerializer<>(mapSerializer);
+		return new MapViewSerializer<>(mapSerializer.duplicate());
 	}
 
 	@Override
@@ -114,6 +123,50 @@ public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> {
 
 	@Override
 	public TypeSerializerSnapshot<MapView<K, V>> snapshotConfiguration() {
-		throw new UnsupportedOperationException();
+		return new MapViewSerializerSnapshot<>(this);
+	}
+
+	/**
+	 * We need to override this as a {@link LegacySerializerSnapshotTransformer}
+	 * because in Flink 1.6.x and below, this serializer was incorrectly returning
+	 * directly the snapshot of the nested map serializer as its own snapshot.
+	 *
+	 * <p>This method transforms the incorrect map serializer snapshot
+	 * to be a proper {@link MapViewSerializerSnapshot}.
+	 */
+	@Override
+	public <U> TypeSerializerSnapshot<MapView<K, V>> transformLegacySerializerSnapshot(
+			TypeSerializerSnapshot<U> legacySnapshot) {
+		if (legacySnapshot instanceof MapViewSerializerSnapshot) {
+			return (TypeSerializerSnapshot<MapView<K, V>>) legacySnapshot;
+		} else if (legacySnapshot instanceof MapSerializerConfigSnapshot) {
+			// first, transform the incorrect map serializer's snapshot
+			// into a proper ListSerializerSnapshot
+			MapSerializerSnapshot<K, V> transformedNestedMapSerializerSnapshot = new MapSerializerSnapshot<>();
+			MapSerializerConfigSnapshot<K, V> snapshot = (MapSerializerConfigSnapshot<K, V>) legacySnapshot;
+			CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+					transformedNestedMapSerializerSnapshot,
+					snapshot.getNestedSerializersAndConfigs().get(0).f1,
+					snapshot.getNestedSerializersAndConfigs().get(1).f1
+			);
+
+			// then, wrap the transformed MapSerializerSnapshot
+			// as a nested snapshot in the final resulting MapViewSerializerSnapshot
+			MapViewSerializerSnapshot<K, V> transformedMapViewSerializerSnapshot = new MapViewSerializerSnapshot<>();
+			CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+					transformedMapViewSerializerSnapshot,
+					transformedNestedMapSerializerSnapshot
+			);
+
+			return transformedMapViewSerializerSnapshot;
+		} else {
+			throw new UnsupportedOperationException(
+					legacySnapshot.getClass().getCanonicalName() + " is not supported.");
+		}
+	}
+
+	public TypeSerializer<Map<K, V>> getMapSerializer() {
+		return mapSerializer;
 	}
+
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index 618cf5c..8b58449 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -31,6 +32,7 @@ import java.util.Map;
  * @param <K> the key type of the map entries.
  * @param <V> the value type of the map entries.
  */
+@Internal
 public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer<K, V>> {
 
 	private static final int CURRENT_VERSION = 1;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java
similarity index 97%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java
index b9ee0b2..af40f28 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -32,6 +33,7 @@ import java.util.Objects;
  * @param <K> key type
  * @param <V> value type
  */
+@Internal
 public class MapViewTypeInfo<K, V> extends TypeInformation<MapView<K, V>> {
 
 	private static final long serialVersionUID = -2883944144965318259L;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java
similarity index 95%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java
index fa8abdb..294806d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
@@ -29,6 +30,7 @@ import java.util.Map;
 /**
  * TypeInformation factory for {@link MapView}.
  */
+@Internal
 public class MapViewTypeInfoFactory<K, V> extends TypeInfoFactory<MapView<K, V>> {
 
 	@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java
similarity index 98%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java
index be08836..dc0b691 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -31,6 +32,7 @@ import java.util.Map;
  * The {@link NullAwareMapSerializer} is similar to MapSerializer, the only difference is that
  * the {@link NullAwareMapSerializer} can handle null keys.
  */
+@Internal
 public class NullAwareMapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 	private static final long serialVersionUID = 5363147328373166590L;
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java
similarity index 95%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java
index 9d20f8e..8964d95 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
@@ -28,6 +29,7 @@ import java.io.IOException;
 /**
  * A serializer for null.
  */
+@Internal
 public class NullSerializer extends TypeSerializerSingleton<Object> {
 	private static final long serialVersionUID = -5381596724707742625L;
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java
index 344e797..78cb668 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.dataformat.BinaryGeneric;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.Decimal;
 import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataview.MapViewSerializer;
+import org.apache.flink.table.dataview.MapViewTypeInfo;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -45,8 +47,6 @@ import org.apache.flink.table.typeutils.BinaryStringSerializer;
 import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
 import org.apache.flink.table.typeutils.DecimalSerializer;
 import org.apache.flink.table.typeutils.DecimalTypeInfo;
-import org.apache.flink.table.typeutils.MapViewSerializer;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
 
 import java.util.ArrayList;
 import java.util.Iterator;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java
index d07acfb..f6df480 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.dataformat.BinaryGeneric;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.Decimal;
 import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataview.MapViewSerializer;
+import org.apache.flink.table.dataview.MapViewTypeInfo;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -45,8 +47,6 @@ import org.apache.flink.table.typeutils.BinaryStringSerializer;
 import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
 import org.apache.flink.table.typeutils.DecimalSerializer;
 import org.apache.flink.table.typeutils.DecimalTypeInfo;
-import org.apache.flink.table.typeutils.MapViewSerializer;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
 
 import java.util.ArrayList;
 import java.util.Iterator;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala
index 9698758..7872664 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.dataview
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.typeutils.{ListViewTypeInfo, MapViewTypeInfo}
 
 /**
   * Data view specification.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index 31784aa..933ec38 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -19,12 +19,12 @@ package org.apache.flink.table.plan.util
 
 import org.apache.flink.api.common.typeinfo.Types
 import org.apache.flink.table.JLong
-import org.apache.flink.table.api.{DataTypes, TableConfig, ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.{DataTypes, ExecutionConfigOptions, TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.dataview.DataViewUtils.useNullSerializerForStateViewFieldsFromAccType
-import org.apache.flink.table.dataview.{DataViewSpec, MapViewSpec}
+import org.apache.flink.table.dataview.{DataViewSpec, MapViewSpec, MapViewTypeInfo}
 import org.apache.flink.table.expressions.ExpressionUtils.extractValue
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
@@ -42,8 +42,7 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot
 import org.apache.flink.table.types.logical.{LogicalTypeRoot, _}
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-import org.apache.flink.table.typeutils.{BinaryStringTypeInfo, MapViewTypeInfo}
-
+import org.apache.flink.table.typeutils.BinaryStringTypeInfo
 import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rex.RexInputRef
@@ -51,8 +50,6 @@ import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
 import org.apache.calcite.tools.RelBuilder
-
-import java.lang.{Long => JLong}
 import java.time.Duration
 import java.util
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala
deleted file mode 100644
index 2214086..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.dataview
-
-/**
-  * A [[DataView]] is a collection type that can be used in the accumulator of an
-  * [[org.apache.flink.table.functions.AggregateFunction]].
-  *
-  * Depending on the context in which the [[org.apache.flink.table.functions.AggregateFunction]] is
-  * used, a [[DataView]] can be backed by a Java heap collection or a state backend.
-  */
-trait DataView extends Serializable {
-
-  /**
-    * Clears the [[DataView]] and removes all data.
-    */
-  def clear(): Unit
-
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
index 943fe03..c309bf1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.api.dataview
 
 import java.lang.reflect.Field
-
 import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, State, StateDescriptor}
 import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
 
@@ -41,7 +40,7 @@ case class ListViewSpec[T](
   extends DataViewSpec[ListView[T]] {
 
   override def toStateDescriptor: StateDescriptor[_ <: State, _] =
-    new ListStateDescriptor[T](stateId, listViewTypeInfo.elementType)
+    new ListStateDescriptor[T](stateId, listViewTypeInfo.getElementType)
 }
 
 case class MapViewSpec[K, V](
@@ -51,5 +50,5 @@ case class MapViewSpec[K, V](
   extends DataViewSpec[MapView[K, V]] {
 
   override def toStateDescriptor: StateDescriptor[_ <: State, _] =
-    new MapStateDescriptor[K, V](stateId, mapViewTypeInfo.keyType, mapViewTypeInfo.valueType)
+    new MapStateDescriptor[K, V](stateId, mapViewTypeInfo.getKeyType, mapViewTypeInfo.getValueType)
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
deleted file mode 100644
index 59b2426..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.dataview
-
-import java.lang.{Iterable => JIterable}
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
-import org.apache.flink.table.dataview.ListViewTypeInfoFactory
-
-/**
-  * A [[ListView]] provides List functionality for accumulators used by user-defined aggregate
-  * functions [[org.apache.flink.api.common.functions.AggregateFunction]].
-  *
-  * A [[ListView]] can be backed by a Java ArrayList or a state backend, depending on the context in
-  * which the aggregate function is used.
-  *
-  * At runtime [[ListView]] will be replaced by a [[org.apache.flink.table.dataview.StateListView]]
-  * if it is backed by a state backend.
-  *
-  * Example of an accumulator type with a [[ListView]] and an aggregate function that uses it:
-  * {{{
-  *
-  *  public class MyAccum {
-  *    public ListView<String> list;
-  *    public long count;
-  *  }
-  *
-  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
-  *
-  *   @Override
-  *   public MyAccum createAccumulator() {
-  *     MyAccum accum = new MyAccum();
-  *     accum.list = new ListView<>(Types.STRING);
-  *     accum.count = 0L;
-  *     return accum;
-  *   }
-  *
-  *   public void accumulate(MyAccum accumulator, String id) {
-  *     accumulator.list.add(id);
-  *     ... ...
-  *     accumulator.get()
-  *     ... ...
-  *   }
-  *
-  *   @Override
-  *   public Long getValue(MyAccum accumulator) {
-  *     accumulator.list.add(id);
-  *     ... ...
-  *     accumulator.get()
-  *     ... ...
-  *     return accumulator.count;
-  *   }
-  * }
-  *
-  * }}}
-  *
-  * @param elementTypeInfo element type information
-  * @tparam T element type
-  */
-@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
-class ListView[T](
-    @transient private[flink] val elementTypeInfo: TypeInformation[T],
-    private[flink] val list: util.List[T])
-  extends DataView {
-
-  /**
-    * Creates a list view for elements of the specified type.
-    *
-    * @param elementTypeInfo The type of the list view elements.
-    */
-  def this(elementTypeInfo: TypeInformation[T]) {
-    this(elementTypeInfo, new util.ArrayList[T]())
-  }
-
-  /**
-    * Creates a list view.
-    */
-  def this() = this(null)
-
-  /**
-    * Returns an iterable of the list view.
-    *
-    * @throws Exception Thrown if the system cannot get data.
-    * @return The iterable of the list or { @code null} if the list is empty.
-    */
-  @throws[Exception]
-  def get: JIterable[T] = {
-    if (!list.isEmpty) {
-      list
-    } else {
-      null
-    }
-  }
-
-  /**
-    * Adds the given value to the list.
-    *
-    * @throws Exception Thrown if the system cannot add data.
-    * @param value The element to be appended to this list view.
-    */
-  @throws[Exception]
-  def add(value: T): Unit = list.add(value)
-
-  /**
-    * Adds all of the elements of the specified list to this list view.
-    *
-    * @throws Exception Thrown if the system cannot add all data.
-    * @param list The list with the elements that will be stored in this list view.
-    */
-  @throws[Exception]
-  def addAll(list: util.List[T]): Unit = this.list.addAll(list)
-
-  /**
-    * Removes all of the elements from this list view.
-    */
-  override def clear(): Unit = list.clear()
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ListView[T] =>
-      list.equals(that.list)
-    case _ => false
-  }
-
-  override def hashCode(): Int = list.hashCode()
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
deleted file mode 100644
index 9206d6a..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.dataview
-
-import java.lang.{Iterable => JIterable}
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
-import org.apache.flink.table.dataview.MapViewTypeInfoFactory
-
-/**
-  * A [[MapView]] provides Map functionality for accumulators used by user-defined aggregate
-  * functions [[org.apache.flink.table.functions.AggregateFunction]].
-  *
-  * A [[MapView]] can be backed by a Java HashMap or a state backend, depending on the context in
-  * which the aggregation function is used.
-  *
-  * At runtime [[MapView]] will be replaced by a [[org.apache.flink.table.dataview.StateMapView]]
-  * if it is backed by a state backend.
-  *
-  * Example of an accumulator type with a [[MapView]] and an aggregate function that uses it:
-  * {{{
-  *
-  *  public class MyAccum {
-  *    public MapView<String, Integer> map;
-  *    public long count;
-  *  }
-  *
-  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
-  *
-  *    @Override
-  *    public MyAccum createAccumulator() {
-  *      MyAccum accum = new MyAccum();
-  *      accum.map = new MapView<>(Types.STRING, Types.INT);
-  *      accum.count = 0L;
-  *      return accum;
-  *    }
-  *
-  *    public void accumulate(MyAccum accumulator, String id) {
-  *      try {
-  *          if (!accumulator.map.contains(id)) {
-  *            accumulator.map.put(id, 1);
-  *            accumulator.count++;
-  *          }
-  *      } catch (Exception e) {
-  *        e.printStackTrace();
-  *      }
-  *    }
-  *
-  *    @Override
-  *    public Long getValue(MyAccum accumulator) {
-  *      return accumulator.count;
-  *    }
-  *  }
-  *
-  * }}}
-  *
-  * @param keyTypeInfo key type information
-  * @param valueTypeInfo value type information
-  * @tparam K key type
-  * @tparam V value type
-  */
-@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
-class MapView[K, V](
-    @transient private[flink] val keyTypeInfo: TypeInformation[K],
-    @transient private[flink] val valueTypeInfo: TypeInformation[V],
-    private[flink] val map: util.Map[K, V])
-  extends DataView {
-
-  /**
-    * Creates a MapView with the specified key and value types.
-    *
-    * @param keyTypeInfo The type of keys of the MapView.
-    * @param valueTypeInfo The type of the values of the MapView.
-    */
-  def this(keyTypeInfo: TypeInformation[K], valueTypeInfo: TypeInformation[V]) {
-    this(keyTypeInfo, valueTypeInfo, new util.HashMap[K, V]())
-  }
-
-  /**
-    * Creates a MapView.
-    */
-  def this() = this(null, null)
-
-  /**
-    * Return the value for the specified key or { @code null } if the key is not in the map view.
-    *
-    * @param key The look up key.
-    * @return The value for the specified key.
-    * @throws Exception Thrown if the system cannot get data.
-    */
-  @throws[Exception]
-  def get(key: K): V = map.get(key)
-
-  /**
-    * Inserts a value for the given key into the map view.
-    * If the map view already contains a value for the key, the existing value is overwritten.
-    *
-    * @param key   The key for which the value is inserted.
-    * @param value The value that is inserted for the key.
-    * @throws Exception Thrown if the system cannot put data.
-    */
-  @throws[Exception]
-  def put(key: K, value: V): Unit = map.put(key, value)
-
-  /**
-    * Inserts all mappings from the specified map to this map view.
-    *
-    * @param map The map whose entries are inserted into this map view.
-    * @throws Exception Thrown if the system cannot access the map.
-    */
-  @throws[Exception]
-  def putAll(map: util.Map[K, V]): Unit = this.map.putAll(map)
-
-  /**
-    * Deletes the value for the given key.
-    *
-    * @param key The key for which the value is deleted.
-    * @throws Exception Thrown if the system cannot access the map.
-    */
-  @throws[Exception]
-  def remove(key: K): Unit = map.remove(key)
-
-  /**
-    * Checks if the map view contains a value for a given key.
-    *
-    * @param key The key to check.
-    * @return True if there exists a value for the given key, false otherwise.
-    * @throws Exception Thrown if the system cannot access the map.
-    */
-  @throws[Exception]
-  def contains(key: K): Boolean = map.containsKey(key)
-
-  /**
-    * Returns all entries of the map view.
-    *
-    * @return An iterable of all the key-value pairs in the map view.
-    * @throws Exception Thrown if the system cannot access the map.
-    */
-  @throws[Exception]
-  def entries: JIterable[util.Map.Entry[K, V]] = map.entrySet()
-
-  /**
-    * Returns all the keys in the map view.
-    *
-    * @return An iterable of all the keys in the map.
-    * @throws Exception Thrown if the system cannot access the map.
-    */
-  @throws[Exception]
-  def keys: JIterable[K] = map.keySet()
-
-  /**
-    * Returns all the values in the map view.
-    *
-    * @return An iterable of all the values in the map.
-    * @throws Exception Thrown if the system cannot access the map.
-    */
-  @throws[Exception]
-  def values: JIterable[V] = map.values()
-
-  /**
-    * Returns an iterator over all entries of the map view.
-    *
-    * @return An iterator over all the mappings in the map.
-    * @throws Exception Thrown if the system cannot access the map.
-    */
-  @throws[Exception]
-  def iterator: util.Iterator[util.Map.Entry[K, V]] = map.entrySet().iterator()
-
-  /**
-    * Removes all entries of this map.
-    */
-  override def clear(): Unit = map.clear()
-
-  override def equals(other: Any): Boolean = other match {
-    case that: MapView[K, V] =>
-        map.equals(that.map)
-    case _ => false
-  }
-
-  override def hashCode(): Int = map.hashCode()
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
deleted file mode 100644
index 75f9326..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import org.apache.flink.api.common.typeutils._
-import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializerSnapshot}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-import org.apache.flink.table.api.dataview.ListView
-
-/**
-  * A serializer for [[ListView]]. The serializer relies on an element
-  * serializer for the serialization of the list's elements.
-  *
-  * The serialization format for the list is as follows: four bytes for the length of the list,
-  * followed by the serialized representation of each element.
-  *
-  * @param listSerializer List serializer.
-  * @tparam T The type of element in the list.
-  */
-@SerialVersionUID(-2030398712359267867L)
-class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]])
-  extends TypeSerializer[ListView[T]]
-  with LegacySerializerSnapshotTransformer[ListView[T]] {
-
-  override def isImmutableType: Boolean = false
-
-  override def duplicate(): TypeSerializer[ListView[T]] = {
-    new ListViewSerializer[T](listSerializer.duplicate())
-  }
-
-  override def createInstance(): ListView[T] = {
-    new ListView[T]
-  }
-
-  override def copy(from: ListView[T]): ListView[T] = {
-    new ListView[T](null, listSerializer.copy(from.list))
-  }
-
-  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from)
-
-  override def getLength: Int = -1
-
-  override def serialize(record: ListView[T], target: DataOutputView): Unit = {
-    listSerializer.serialize(record.list, target)
-  }
-
-  override def deserialize(source: DataInputView): ListView[T] = {
-    new ListView[T](null, listSerializer.deserialize(source))
-  }
-
-  override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] =
-    deserialize(source)
-
-  override def copy(source: DataInputView, target: DataOutputView): Unit =
-    listSerializer.copy(source, target)
-
-  override def hashCode(): Int = listSerializer.hashCode()
-
-  override def equals(obj: Any): Boolean =
-    listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
-
-  override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
-    new ListViewSerializerSnapshot[T](this)
-
-  /**
-    * We need to override this as a [[LegacySerializerSnapshotTransformer]]
-    * because in Flink 1.6.x and below, this serializer was incorrectly returning
-    * directly the snapshot of the nested list serializer as its own snapshot.
-    *
-    * <p>This method transforms the incorrect list serializer snapshot
-    * to be a proper [[ListViewSerializerSnapshot]].
-    */
-  override def transformLegacySerializerSnapshot[U](
-      legacySnapshot: TypeSerializerSnapshot[U]
-  ): TypeSerializerSnapshot[ListView[T]] = {
-
-    legacySnapshot match {
-      case correctSnapshot: ListViewSerializerSnapshot[T] =>
-        correctSnapshot
-
-      case legacySnapshot: CollectionSerializerConfigSnapshot[java.util.List[T], T] =>
-        // first, transform the incorrect list serializer's snapshot
-        // into a proper ListSerializerSnapshot
-        val transformedNestedListSerializerSnapshot = new ListSerializerSnapshot[T]
-        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
-          transformedNestedListSerializerSnapshot,
-          legacySnapshot.getSingleNestedSerializerAndConfig.f1)
-
-        // then, wrap the transformed ListSerializerSnapshot
-        // as a nested snapshot in the final resulting ListViewSerializerSnapshot
-        val transformedListViewSerializerSnapshot = new ListViewSerializerSnapshot[T]()
-        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
-          transformedListViewSerializerSnapshot,
-          transformedNestedListSerializerSnapshot)
-
-        transformedListViewSerializerSnapshot
-    }
-  }
-
-  def getListSerializer: TypeSerializer[java.util.List[T]] = listSerializer
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
deleted file mode 100644
index a10b675..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.base.ListSerializer
-import org.apache.flink.table.api.dataview.ListView
-
-/**
-  * [[ListView]] type information.
-  *
-  * @param elementType element type information
-  * @tparam T element type
-  */
-class  ListViewTypeInfo[T](val elementType: TypeInformation[T])
-  extends TypeInformation[ListView[T]] {
-
-  override def isBasicType: Boolean = false
-
-  override def isTupleType: Boolean = false
-
-  override def getArity: Int = 1
-
-  override def getTotalFields: Int = 1
-
-  override def getTypeClass: Class[ListView[T]] = classOf[ListView[T]]
-
-  override def isKeyType: Boolean = false
-
-  override def createSerializer(config: ExecutionConfig): TypeSerializer[ListView[T]] = {
-    val typeSer = elementType.createSerializer(config)
-    new ListViewSerializer[T](new ListSerializer[T](typeSer))
-  }
-
-  override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass
-
-  override def hashCode(): Int = 31 * elementType.hashCode
-
-  override def equals(obj: Any): Boolean = canEqual(obj) && {
-    obj match {
-      case other: ListViewTypeInfo[T] =>
-        elementType.equals(other.elementType)
-      case _ => false
-    }
-  }
-
-  override def toString: String = s"ListView<$elementType>"
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala
deleted file mode 100644
index eda6cb9..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import java.lang.reflect.Type
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation}
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.table.api.dataview.ListView
-
-class ListViewTypeInfoFactory[T] extends TypeInfoFactory[ListView[T]] {
-
-  override def createTypeInfo(
-      t: Type,
-      genericParameters: util.Map[String, TypeInformation[_]]): TypeInformation[ListView[T]] = {
-
-    var elementType = genericParameters.get("T")
-
-    if (elementType == null) {
-      // we might can get the elementType later from the ListView constructor
-      elementType = new GenericTypeInfo(classOf[Any])
-    }
-
-    new ListViewTypeInfo[T](elementType.asInstanceOf[TypeInformation[T]])
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
deleted file mode 100644
index 9947c35..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import org.apache.flink.api.common.typeutils._
-import org.apache.flink.api.common.typeutils.base.{MapSerializerConfigSnapshot, MapSerializerSnapshot}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-import org.apache.flink.table.api.dataview.MapView
-
-/**
-  * A serializer for [[MapView]]. The serializer relies on a key serializer and a value
-  * serializer for the serialization of the map's key-value pairs.
-  *
-  * The serialization format for the map is as follows: four bytes for the length of the map,
-  * followed by the serialized representation of each key-value pair. To allow null values,
-  * each value is prefixed by a null marker.
-  *
-  * @param mapSerializer Map serializer.
-  * @tparam K The type of the keys in the map.
-  * @tparam V The type of the values in the map.
-  */
-@SerialVersionUID(-9007142882049098705L)
-class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K, V]])
-  extends TypeSerializer[MapView[K, V]]
-  with LegacySerializerSnapshotTransformer[MapView[K, V]] {
-
-  override def isImmutableType: Boolean = false
-
-  override def duplicate(): TypeSerializer[MapView[K, V]] =
-    new MapViewSerializer[K, V](mapSerializer.duplicate())
-
-  override def createInstance(): MapView[K, V] = {
-    new MapView[K, V]()
-  }
-
-  override def copy(from: MapView[K, V]): MapView[K, V] = {
-    new MapView[K, V](null, null, mapSerializer.copy(from.map))
-  }
-
-  override def copy(from: MapView[K, V], reuse: MapView[K, V]): MapView[K, V] = copy(from)
-
-  override def getLength: Int = -1  // var length
-
-  override def serialize(record: MapView[K, V], target: DataOutputView): Unit = {
-    mapSerializer.serialize(record.map, target)
-  }
-
-  override def deserialize(source: DataInputView): MapView[K, V] = {
-    new MapView[K, V](null, null, mapSerializer.deserialize(source))
-  }
-
-  override def deserialize(reuse: MapView[K, V], source: DataInputView): MapView[K, V] =
-    deserialize(source)
-
-  override def copy(source: DataInputView, target: DataOutputView): Unit =
-    mapSerializer.copy(source, target)
-
-  override def hashCode(): Int = mapSerializer.hashCode()
-
-  override def equals(obj: Any): Boolean =
-    mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer)
-
-  override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
-    new MapViewSerializerSnapshot[K, V](this)
-
-  /**
-    * We need to override this as a [[LegacySerializerSnapshotTransformer]]
-    * because in Flink 1.6.x and below, this serializer was incorrectly returning
-    * directly the snapshot of the nested map serializer as its own snapshot.
-    *
-    * <p>This method transforms the incorrect map serializer snapshot
-    * to be a proper [[MapViewSerializerSnapshot]].
-    */
-  override def transformLegacySerializerSnapshot[U](
-      legacySnapshot: TypeSerializerSnapshot[U]
-  ): TypeSerializerSnapshot[MapView[K, V]] = {
-
-    legacySnapshot match {
-      case correctSnapshot: MapViewSerializerSnapshot[K, V] =>
-        correctSnapshot
-
-      case legacySnapshot: MapSerializerConfigSnapshot[K, V] =>
-        // first, transform the incorrect map serializer's snapshot
-        // into a proper ListSerializerSnapshot
-        val transformedNestedMapSerializerSnapshot =
-          new MapSerializerSnapshot[K, V]
-        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
-          transformedNestedMapSerializerSnapshot,
-          legacySnapshot.getNestedSerializersAndConfigs.get(0).f1,
-          legacySnapshot.getNestedSerializersAndConfigs.get(1).f1
-        )
-
-        // then, wrap the transformed MapSerializerSnapshot
-        // as a nested snapshot in the final resulting MapViewSerializerSnapshot
-        val transformedMapViewSerializerSnapshot =
-          new MapViewSerializerSnapshot[K, V]()
-        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
-          transformedMapViewSerializerSnapshot,
-          transformedNestedMapSerializerSnapshot
-        )
-
-        transformedMapViewSerializerSnapshot
-    }
-  }
-
-  def getMapSerializer: TypeSerializer[java.util.Map[K, V]] = mapSerializer
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
deleted file mode 100644
index ec5c222..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.base.MapSerializer
-import org.apache.flink.table.api.dataview.MapView
-
-/**
-  * [[MapView]] type information.
-  *
-  * @param keyType key type information
-  * @param valueType value type information
-  * @tparam K key type
-  * @tparam V value type
-  */
-class  MapViewTypeInfo[K, V](
-    val keyType: TypeInformation[K],
-    val valueType: TypeInformation[V])
-  extends TypeInformation[MapView[K, V]] {
-
-  override def isBasicType = false
-
-  override def isTupleType = false
-
-  override def getArity = 1
-
-  override def getTotalFields = 1
-
-  override def getTypeClass: Class[MapView[K, V]] = classOf[MapView[K, V]]
-
-  override def isKeyType: Boolean = false
-
-  override def createSerializer(config: ExecutionConfig): TypeSerializer[MapView[K, V]] = {
-    val keySer = keyType.createSerializer(config)
-    val valueSer = valueType.createSerializer(config)
-    new MapViewSerializer[K, V](new MapSerializer[K, V](keySer, valueSer))
-  }
-
-  override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass
-
-  override def hashCode(): Int = 31 * keyType.hashCode + valueType.hashCode
-
-  override def equals(obj: Any): Boolean = canEqual(obj) && {
-    obj match {
-      case other: MapViewTypeInfo[_, _] =>
-        keyType.equals(other.keyType) &&
-          valueType.equals(other.valueType)
-      case _ => false
-    }
-  }
-
-  override def toString: String = s"MapView<$keyType, $valueType>"
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala
deleted file mode 100644
index 33c3ffe..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import java.lang.reflect.Type
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation}
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.table.api.dataview.MapView
-
-class MapViewTypeInfoFactory[K, V] extends TypeInfoFactory[MapView[K, V]] {
-
-  override def createTypeInfo(
-      t: Type,
-      genericParameters: util.Map[String, TypeInformation[_]]): TypeInformation[MapView[K, V]] = {
-
-    var keyType = genericParameters.get("K")
-    var valueType = genericParameters.get("V")
-
-    if (keyType == null) {
-      // we might can get the keyType later from the MapView constructor
-      keyType = new GenericTypeInfo(classOf[Any])
-    }
-
-    if (valueType == null) {
-      // we might can get the valueType later from the MapView constructor
-      valueType = new GenericTypeInfo(classOf[Any])
-    }
-
-    new MapViewTypeInfo[K, V](
-      keyType.asInstanceOf[TypeInformation[K]],
-      valueType.asInstanceOf[TypeInformation[V]])
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
index 5186d66..42ce9a5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.functions.aggfunctions
 
 import java.lang.{Iterable => JIterable}
 import java.util
-
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.table.api.dataview.MapView
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index f757189..cdb2479 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -19,16 +19,6 @@
 
 package org.apache.flink.table.functions.utils
 
-import java.lang.reflect.{Method, Modifier}
-import java.lang.{Integer => JInt, Long => JLong}
-import java.sql.{Date, Time, Timestamp}
-import java.util
-
-import com.google.common.primitives.Primitives
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
-import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, SqlOperandCountRange, SqlOperator}
 import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
@@ -36,11 +26,21 @@ import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtract
 import org.apache.flink.table.api.dataview._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.dataview._
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
 import org.apache.flink.table.functions._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
 import org.apache.flink.table.typeutils.FieldInfoUtils
-import org.apache.flink.util.InstantiationUtil
+
+import com.google.common.primitives.Primitives
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, SqlOperandCountRange, SqlOperator}
+
+import java.lang.reflect.{Method, Modifier}
+import java.lang.{Integer => JInt, Long => JLong}
+import java.sql.{Date, Time, Timestamp}
+import java.util
 
 import scala.collection.mutable
 
@@ -474,8 +474,8 @@ object UserDefinedFunctionUtils {
             case map: MapViewTypeInfo[_, _] =>
               val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
               if (mapView != null) {
-                val keyTypeInfo = mapView.keyTypeInfo
-                val valueTypeInfo = mapView.valueTypeInfo
+                val keyTypeInfo = mapView.keyType
+                val valueTypeInfo = mapView.valueType
                 val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) {
                   new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
                 } else {
@@ -499,7 +499,7 @@ object UserDefinedFunctionUtils {
             case list: ListViewTypeInfo[_] =>
               val listView = field.get(acc).asInstanceOf[ListView[_]]
               if (listView != null) {
-                val elementTypeInfo = listView.elementTypeInfo
+                val elementTypeInfo = listView.elementType
                 val newTypeInfo = if (elementTypeInfo != null) {
                   new ListViewTypeInfo(elementTypeInfo)
                 } else {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala
index 3f70bce..227551e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.dataview
 
 import java.lang.Long
 import java.util.Random
-
 import org.apache.flink.api.common.typeutils.base.{ListSerializer, LongSerializer}
 import org.apache.flink.api.common.typeutils.{SerializerTestBase, TypeSerializer}
 import org.apache.flink.table.api.dataview.ListView
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala
index 15f9b02..03ab7f1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.dataview
 
 import java.lang.Long
 import java.util.Random
-
 import org.apache.flink.api.common.typeutils.base.{LongSerializer, MapSerializer, StringSerializer}
 import org.apache.flink.api.common.typeutils.{SerializerTestBase, TypeSerializer}
 import org.apache.flink.table.api.dataview.MapView
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java
index 5b79e0f..d215a03 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.table.typeutils.ListViewTypeInfo;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
 
 /**
  * Default implementation of StateDataViewStore that currently forwards state registration
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java
index ec0f675..fcfeb68 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java
@@ -30,8 +30,6 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.table.typeutils.ListViewTypeInfo;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
 
 /**
  * An implementation of StateDataViewStore for window aggregates which forward the state
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java
index 11abeb8..aa1fb2f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java
@@ -19,8 +19,6 @@
 package org.apache.flink.table.dataview;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.table.typeutils.ListViewTypeInfo;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
 
 /**
  * This interface contains methods for registering {@link StateDataView} with a managed store.
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
index 26d4dec..ecdbaa3 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.dataview.MapViewTypeInfo;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -39,7 +40,6 @@ import org.apache.flink.table.typeutils.BaseRowTypeInfo;
 import org.apache.flink.table.typeutils.BigDecimalTypeInfo;
 import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
 import org.apache.flink.table.typeutils.DecimalTypeInfo;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;