You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/23 13:53:47 UTC
[flink] branch release-1.9 updated: [FLINK-13266][table] Port
DataView related classes to flink-table-common
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 1aa011f [FLINK-13266][table] Port DataView related classes to flink-table-common
1aa011f is described below
commit 1aa011f03aec70df146dfa4feb27cc80a74c9dbf
Author: godfreyhe <go...@163.com>
AuthorDate: Thu Jul 18 15:10:00 2019 +0800
[FLINK-13266][table] Port DataView related classes to flink-table-common
---
.../apache/flink/table/api/dataview/DataView.java | 3 +
.../apache/flink/table/api/dataview/ListView.java | 10 +-
.../apache/flink/table/api/dataview/MapView.java | 10 +-
.../flink/table/dataview}/ListViewSerializer.java | 60 ++++++-
.../table/dataview/ListViewSerializerSnapshot.java | 2 +
.../flink/table/dataview}/ListViewTypeInfo.java | 4 +-
.../table/dataview}/ListViewTypeInfoFactory.java | 4 +-
.../flink/table/dataview}/MapViewSerializer.java | 63 ++++++-
.../table/dataview/MapViewSerializerSnapshot.java | 2 +
.../flink/table/dataview}/MapViewTypeInfo.java | 4 +-
.../table/dataview}/MapViewTypeInfoFactory.java | 4 +-
.../table/dataview}/NullAwareMapSerializer.java | 4 +-
.../flink/table/dataview}/NullSerializer.java | 4 +-
.../FirstValueWithRetractAggFunction.java | 4 +-
.../LastValueWithRetractAggFunction.java | 4 +-
.../apache/flink/table/dataview/DataViewSpec.scala | 1 -
.../flink/table/plan/util/AggregateUtil.scala | 9 +-
.../apache/flink/table/api/dataview/DataView.scala | 35 ----
.../flink/table/api/dataview/DataViewSpec.scala | 5 +-
.../apache/flink/table/api/dataview/ListView.scala | 142 ---------------
.../apache/flink/table/api/dataview/MapView.scala | 198 ---------------------
.../flink/table/dataview/ListViewSerializer.scala | 117 ------------
.../flink/table/dataview/ListViewTypeInfo.scala | 66 -------
.../table/dataview/ListViewTypeInfoFactory.scala | 43 -----
.../flink/table/dataview/MapViewSerializer.scala | 123 -------------
.../flink/table/dataview/MapViewTypeInfo.scala | 72 --------
.../table/dataview/MapViewTypeInfoFactory.scala | 51 ------
.../aggfunctions/CollectAggFunction.scala | 1 -
.../functions/utils/UserDefinedFunctionUtils.scala | 30 ++--
.../table/dataview/ListViewSerializerTest.scala | 1 -
.../table/dataview/MapViewSerializerTest.scala | 1 -
.../table/dataview/PerKeyStateDataViewStore.java | 2 -
.../dataview/PerWindowStateDataViewStore.java | 2 -
.../flink/table/dataview/StateDataViewStore.java | 2 -
.../table/types/TypeInfoDataTypeConverter.java | 2 +-
35 files changed, 171 insertions(+), 914 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
similarity index 95%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
index 487ef79..8fe6aaf 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.api.dataview;
+import org.apache.flink.annotation.PublicEvolving;
+
import java.io.Serializable;
/**
@@ -27,6 +29,7 @@ import java.io.Serializable;
* <p>Depending on the context in which the {@code AggregateFunction} is
* used, a {@link DataView} can be backed by a Java heap collection or a state backend.
*/
+@PublicEvolving
public interface DataView extends Serializable {
/**
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
similarity index 95%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
index 5f34e72..e78b09e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
@@ -18,9 +18,10 @@
package org.apache.flink.table.api.dataview;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.typeutils.ListViewTypeInfoFactory;
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory;
import java.util.ArrayList;
import java.util.List;
@@ -75,6 +76,7 @@ import java.util.Objects;
* }</pre>
*/
@TypeInfo(ListViewTypeInfoFactory.class)
+@PublicEvolving
public class ListView<T> implements DataView {
private static final long serialVersionUID = 5502298766901215388L;
@@ -106,7 +108,7 @@ public class ListView<T> implements DataView {
* Returns an iterable of the list view.
*
* @throws Exception Thrown if the system cannot get data.
- * @return The iterable of the list or { @code null} if the list is empty.
+ * @return The iterable of the list.
*/
public Iterable<T> get() throws Exception {
return list;
@@ -167,8 +169,4 @@ public class ListView<T> implements DataView {
return Objects.hash(elementType, list);
}
- @Override
- public String toString() {
- return "ListView" + list.toString();
- }
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
similarity index 96%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
index aa476be..7feb07b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
@@ -18,9 +18,10 @@
package org.apache.flink.table.api.dataview;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.typeutils.MapViewTypeInfoFactory;
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory;
import java.util.HashMap;
import java.util.Iterator;
@@ -76,6 +77,7 @@ import java.util.Map;
*
*/
@TypeInfo(MapViewTypeInfoFactory.class)
+@PublicEvolving
public class MapView<K, V> implements DataView {
private static final long serialVersionUID = -6185595470714822744L;
@@ -108,7 +110,7 @@ public class MapView<K, V> implements DataView {
}
/**
- * Return the value for the specified key or { @code null } if the key is not in the map view.
+ * Return the value for the specified key or {@code null} if the key is not in the map view.
*
* @param key The look up key.
* @return The value for the specified key.
@@ -223,8 +225,4 @@ public class MapView<K, V> implements DataView {
return map.hashCode();
}
- @Override
- public String toString() {
- return "MapView" + map.toString();
- }
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java
similarity index 54%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java
index 9eb8af2..b4a3445 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java
@@ -16,10 +16,15 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.LegacySerializerSnapshotTransformer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.api.dataview.ListView;
@@ -36,9 +41,12 @@ import java.util.List;
*
* @param <T> The type of element in the list.
*/
-public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> {
+@Internal
+public class ListViewSerializer<T>
+ extends TypeSerializer<ListView<T>>
+ implements LegacySerializerSnapshotTransformer<ListView<T>> {
- private static final long serialVersionUID = 3272986300876096397L;
+ private static final long serialVersionUID = -2030398712359267867L;
private final TypeSerializer<List<T>> listSerializer;
@@ -63,7 +71,7 @@ public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> {
@Override
public ListView<T> copy(ListView<T> from) {
- return new ListView<>(null, from.list);
+ return new ListView<>(null, listSerializer.copy(from.list));
}
@Override
@@ -113,6 +121,48 @@ public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> {
@Override
public TypeSerializerSnapshot<ListView<T>> snapshotConfiguration() {
- throw new UnsupportedOperationException();
+ return new ListViewSerializerSnapshot<>(this);
}
+
+ /**
+ * We need to override this as a {@link LegacySerializerSnapshotTransformer}
+ * because in Flink 1.6.x and below, this serializer was incorrectly returning
+ * directly the snapshot of the nested list serializer as its own snapshot.
+ *
+ * <p>This method transforms the incorrect list serializer snapshot
+ * to be a proper {@link ListViewSerializerSnapshot}.
+ */
+ @Override
+ public <U> TypeSerializerSnapshot<ListView<T>> transformLegacySerializerSnapshot(
+ TypeSerializerSnapshot<U> legacySnapshot) {
+ if (legacySnapshot instanceof ListViewSerializerSnapshot) {
+ return (TypeSerializerSnapshot<ListView<T>>) legacySnapshot;
+ } else if (legacySnapshot instanceof CollectionSerializerConfigSnapshot) {
+ // first, transform the incorrect list serializer's snapshot
+ // into a proper ListSerializerSnapshot
+ ListSerializerSnapshot<T> transformedNestedListSerializerSnapshot = new ListSerializerSnapshot<>();
+ CollectionSerializerConfigSnapshot<List<T>, T> snapshot =
+ (CollectionSerializerConfigSnapshot<List<T>, T>) legacySnapshot;
+ CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+ transformedNestedListSerializerSnapshot,
+ (TypeSerializerSnapshot<?>) (snapshot.getSingleNestedSerializerAndConfig().f1));
+
+ // then, wrap the transformed ListSerializerSnapshot
+ // as a nested snapshot in the final resulting ListViewSerializerSnapshot
+ ListViewSerializerSnapshot<T> transformedListViewSerializerSnapshot = new ListViewSerializerSnapshot<>();
+ CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+ transformedListViewSerializerSnapshot,
+ transformedNestedListSerializerSnapshot);
+
+ return transformedListViewSerializerSnapshot;
+ } else {
+ throw new UnsupportedOperationException(
+ legacySnapshot.getClass().getCanonicalName() + " is not supported.");
+ }
+ }
+
+ public TypeSerializer<List<T>> getListSerializer() {
+ return listSerializer;
+ }
+
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index a130271..72547fd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -30,6 +31,7 @@ import java.util.List;
*
* @param <T> the type of the list elements.
*/
+@Internal
public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer<T>> {
private static final int CURRENT_VERSION = 1;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java
similarity index 97%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java
index f0c71a4..951b140 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -29,6 +30,7 @@ import org.apache.flink.table.api.dataview.ListView;
*
* @param <T> element type
*/
+@Internal
public class ListViewTypeInfo<T> extends TypeInformation<ListView<T>> {
private static final long serialVersionUID = 6468505781419989441L;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java
similarity index 94%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java
index 7dc9227..9aef459 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
@@ -29,6 +30,7 @@ import java.util.Map;
/**
* TypeInformation factory for {@link ListView}.
*/
+@Internal
public class ListViewTypeInfoFactory<T> extends TypeInfoFactory<ListView<T>> {
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java
similarity index 55%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java
index 3f697d6..b34b698 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java
@@ -16,10 +16,15 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.LegacySerializerSnapshotTransformer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.api.dataview.MapView;
@@ -38,8 +43,12 @@ import java.util.Map;
* @param <K> The type of the keys in the map.
* @param <V> The type of the values in the map.
*/
-public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> {
- private static final long serialVersionUID = 202624224378185203L;
+@Internal
+public class MapViewSerializer<K, V>
+ extends TypeSerializer<MapView<K, V>>
+ implements LegacySerializerSnapshotTransformer<MapView<K, V>> {
+
+ private static final long serialVersionUID = -9007142882049098705L;
private final TypeSerializer<Map<K, V>> mapSerializer;
@@ -54,7 +63,7 @@ public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> {
@Override
public TypeSerializer<MapView<K, V>> duplicate() {
- return new MapViewSerializer<>(mapSerializer);
+ return new MapViewSerializer<>(mapSerializer.duplicate());
}
@Override
@@ -114,6 +123,50 @@ public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> {
@Override
public TypeSerializerSnapshot<MapView<K, V>> snapshotConfiguration() {
- throw new UnsupportedOperationException();
+ return new MapViewSerializerSnapshot<>(this);
+ }
+
+ /**
+ * We need to override this as a {@link LegacySerializerSnapshotTransformer}
+ * because in Flink 1.6.x and below, this serializer was incorrectly returning
+ * directly the snapshot of the nested map serializer as its own snapshot.
+ *
+ * <p>This method transforms the incorrect map serializer snapshot
+ * to be a proper {@link MapViewSerializerSnapshot}.
+ */
+ @Override
+ public <U> TypeSerializerSnapshot<MapView<K, V>> transformLegacySerializerSnapshot(
+ TypeSerializerSnapshot<U> legacySnapshot) {
+ if (legacySnapshot instanceof MapViewSerializerSnapshot) {
+ return (TypeSerializerSnapshot<MapView<K, V>>) legacySnapshot;
+ } else if (legacySnapshot instanceof MapSerializerConfigSnapshot) {
+ // first, transform the incorrect map serializer's snapshot
+ // into a proper ListSerializerSnapshot
+ MapSerializerSnapshot<K, V> transformedNestedMapSerializerSnapshot = new MapSerializerSnapshot<>();
+ MapSerializerConfigSnapshot<K, V> snapshot = (MapSerializerConfigSnapshot<K, V>) legacySnapshot;
+ CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+ transformedNestedMapSerializerSnapshot,
+ snapshot.getNestedSerializersAndConfigs().get(0).f1,
+ snapshot.getNestedSerializersAndConfigs().get(1).f1
+ );
+
+ // then, wrap the transformed MapSerializerSnapshot
+ // as a nested snapshot in the final resulting MapViewSerializerSnapshot
+ MapViewSerializerSnapshot<K, V> transformedMapViewSerializerSnapshot = new MapViewSerializerSnapshot<>();
+ CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+ transformedMapViewSerializerSnapshot,
+ transformedNestedMapSerializerSnapshot
+ );
+
+ return transformedMapViewSerializerSnapshot;
+ } else {
+ throw new UnsupportedOperationException(
+ legacySnapshot.getClass().getCanonicalName() + " is not supported.");
+ }
+ }
+
+ public TypeSerializer<Map<K, V>> getMapSerializer() {
+ return mapSerializer;
}
+
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index 618cf5c..8b58449 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -31,6 +32,7 @@ import java.util.Map;
* @param <K> the key type of the map entries.
* @param <V> the value type of the map entries.
*/
+@Internal
public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer<K, V>> {
private static final int CURRENT_VERSION = 1;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java
similarity index 97%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java
index b9ee0b2..af40f28 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -32,6 +33,7 @@ import java.util.Objects;
* @param <K> key type
* @param <V> value type
*/
+@Internal
public class MapViewTypeInfo<K, V> extends TypeInformation<MapView<K, V>> {
private static final long serialVersionUID = -2883944144965318259L;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java
similarity index 95%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java
index fa8abdb..294806d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
@@ -29,6 +30,7 @@ import java.util.Map;
/**
* TypeInformation factory for {@link MapView}.
*/
+@Internal
public class MapViewTypeInfoFactory<K, V> extends TypeInfoFactory<MapView<K, V>> {
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java
similarity index 98%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java
index be08836..dc0b691 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
@@ -31,6 +32,7 @@ import java.util.Map;
* The {@link NullAwareMapSerializer} is similar to MapSerializer, the only difference is that
* the {@link NullAwareMapSerializer} can handle null keys.
*/
+@Internal
public class NullAwareMapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
private static final long serialVersionUID = 5363147328373166590L;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java
similarity index 95%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java
index 9d20f8e..8964d95 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataview;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
@@ -28,6 +29,7 @@ import java.io.IOException;
/**
* A serializer for null.
*/
+@Internal
public class NullSerializer extends TypeSerializerSingleton<Object> {
private static final long serialVersionUID = -5381596724707742625L;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java
index 344e797..78cb668 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.dataformat.BinaryGeneric;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataview.MapViewSerializer;
+import org.apache.flink.table.dataview.MapViewTypeInfo;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -45,8 +47,6 @@ import org.apache.flink.table.typeutils.BinaryStringSerializer;
import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.typeutils.DecimalSerializer;
import org.apache.flink.table.typeutils.DecimalTypeInfo;
-import org.apache.flink.table.typeutils.MapViewSerializer;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
import java.util.ArrayList;
import java.util.Iterator;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java
index d07acfb..f6df480 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.dataformat.BinaryGeneric;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataview.MapViewSerializer;
+import org.apache.flink.table.dataview.MapViewTypeInfo;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -45,8 +47,6 @@ import org.apache.flink.table.typeutils.BinaryStringSerializer;
import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.typeutils.DecimalSerializer;
import org.apache.flink.table.typeutils.DecimalTypeInfo;
-import org.apache.flink.table.typeutils.MapViewSerializer;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
import java.util.ArrayList;
import java.util.Iterator;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala
index 9698758..7872664 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.dataview
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.typeutils.{ListViewTypeInfo, MapViewTypeInfo}
/**
* Data view specification.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index 31784aa..933ec38 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -19,12 +19,12 @@ package org.apache.flink.table.plan.util
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.table.JLong
-import org.apache.flink.table.api.{DataTypes, TableConfig, ExecutionConfigOptions, TableException}
+import org.apache.flink.table.api.{DataTypes, ExecutionConfigOptions, TableConfig, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.dataview.DataViewUtils.useNullSerializerForStateViewFieldsFromAccType
-import org.apache.flink.table.dataview.{DataViewSpec, MapViewSpec}
+import org.apache.flink.table.dataview.{DataViewSpec, MapViewSpec, MapViewTypeInfo}
import org.apache.flink.table.expressions.ExpressionUtils.extractValue
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
@@ -42,8 +42,7 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot
import org.apache.flink.table.types.logical.{LogicalTypeRoot, _}
import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-import org.apache.flink.table.typeutils.{BinaryStringTypeInfo, MapViewTypeInfo}
-
+import org.apache.flink.table.typeutils.BinaryStringTypeInfo
import org.apache.calcite.rel.`type`._
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rex.RexInputRef
@@ -51,8 +50,6 @@ import org.apache.calcite.sql.fun._
import org.apache.calcite.sql.validate.SqlMonotonicity
import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
import org.apache.calcite.tools.RelBuilder
-
-import java.lang.{Long => JLong}
import java.time.Duration
import java.util
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala
deleted file mode 100644
index 2214086..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.dataview
-
-/**
- * A [[DataView]] is a collection type that can be used in the accumulator of an
- * [[org.apache.flink.table.functions.AggregateFunction]].
- *
- * Depending on the context in which the [[org.apache.flink.table.functions.AggregateFunction]] is
- * used, a [[DataView]] can be backed by a Java heap collection or a state backend.
- */
-trait DataView extends Serializable {
-
- /**
- * Clears the [[DataView]] and removes all data.
- */
- def clear(): Unit
-
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
index 943fe03..c309bf1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.api.dataview
import java.lang.reflect.Field
-
import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, State, StateDescriptor}
import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
@@ -41,7 +40,7 @@ case class ListViewSpec[T](
extends DataViewSpec[ListView[T]] {
override def toStateDescriptor: StateDescriptor[_ <: State, _] =
- new ListStateDescriptor[T](stateId, listViewTypeInfo.elementType)
+ new ListStateDescriptor[T](stateId, listViewTypeInfo.getElementType)
}
case class MapViewSpec[K, V](
@@ -51,5 +50,5 @@ case class MapViewSpec[K, V](
extends DataViewSpec[MapView[K, V]] {
override def toStateDescriptor: StateDescriptor[_ <: State, _] =
- new MapStateDescriptor[K, V](stateId, mapViewTypeInfo.keyType, mapViewTypeInfo.valueType)
+ new MapStateDescriptor[K, V](stateId, mapViewTypeInfo.getKeyType, mapViewTypeInfo.getValueType)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
deleted file mode 100644
index 59b2426..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.dataview
-
-import java.lang.{Iterable => JIterable}
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
-import org.apache.flink.table.dataview.ListViewTypeInfoFactory
-
-/**
- * A [[ListView]] provides List functionality for accumulators used by user-defined aggregate
- * functions [[org.apache.flink.api.common.functions.AggregateFunction]].
- *
- * A [[ListView]] can be backed by a Java ArrayList or a state backend, depending on the context in
- * which the aggregate function is used.
- *
- * At runtime [[ListView]] will be replaced by a [[org.apache.flink.table.dataview.StateListView]]
- * if it is backed by a state backend.
- *
- * Example of an accumulator type with a [[ListView]] and an aggregate function that uses it:
- * {{{
- *
- * public class MyAccum {
- * public ListView<String> list;
- * public long count;
- * }
- *
- * public class MyAgg extends AggregateFunction<Long, MyAccum> {
- *
- * @Override
- * public MyAccum createAccumulator() {
- * MyAccum accum = new MyAccum();
- * accum.list = new ListView<>(Types.STRING);
- * accum.count = 0L;
- * return accum;
- * }
- *
- * public void accumulate(MyAccum accumulator, String id) {
- * accumulator.list.add(id);
- * ... ...
- * accumulator.get()
- * ... ...
- * }
- *
- * @Override
- * public Long getValue(MyAccum accumulator) {
- * accumulator.list.add(id);
- * ... ...
- * accumulator.get()
- * ... ...
- * return accumulator.count;
- * }
- * }
- *
- * }}}
- *
- * @param elementTypeInfo element type information
- * @tparam T element type
- */
-@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
-class ListView[T](
- @transient private[flink] val elementTypeInfo: TypeInformation[T],
- private[flink] val list: util.List[T])
- extends DataView {
-
- /**
- * Creates a list view for elements of the specified type.
- *
- * @param elementTypeInfo The type of the list view elements.
- */
- def this(elementTypeInfo: TypeInformation[T]) {
- this(elementTypeInfo, new util.ArrayList[T]())
- }
-
- /**
- * Creates a list view.
- */
- def this() = this(null)
-
- /**
- * Returns an iterable of the list view.
- *
- * @throws Exception Thrown if the system cannot get data.
- * @return The iterable of the list or { @code null} if the list is empty.
- */
- @throws[Exception]
- def get: JIterable[T] = {
- if (!list.isEmpty) {
- list
- } else {
- null
- }
- }
-
- /**
- * Adds the given value to the list.
- *
- * @throws Exception Thrown if the system cannot add data.
- * @param value The element to be appended to this list view.
- */
- @throws[Exception]
- def add(value: T): Unit = list.add(value)
-
- /**
- * Adds all of the elements of the specified list to this list view.
- *
- * @throws Exception Thrown if the system cannot add all data.
- * @param list The list with the elements that will be stored in this list view.
- */
- @throws[Exception]
- def addAll(list: util.List[T]): Unit = this.list.addAll(list)
-
- /**
- * Removes all of the elements from this list view.
- */
- override def clear(): Unit = list.clear()
-
- override def equals(other: Any): Boolean = other match {
- case that: ListView[T] =>
- list.equals(that.list)
- case _ => false
- }
-
- override def hashCode(): Int = list.hashCode()
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
deleted file mode 100644
index 9206d6a..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.dataview
-
-import java.lang.{Iterable => JIterable}
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
-import org.apache.flink.table.dataview.MapViewTypeInfoFactory
-
-/**
- * A [[MapView]] provides Map functionality for accumulators used by user-defined aggregate
- * functions [[org.apache.flink.table.functions.AggregateFunction]].
- *
- * A [[MapView]] can be backed by a Java HashMap or a state backend, depending on the context in
- * which the aggregation function is used.
- *
- * At runtime [[MapView]] will be replaced by a [[org.apache.flink.table.dataview.StateMapView]]
- * if it is backed by a state backend.
- *
- * Example of an accumulator type with a [[MapView]] and an aggregate function that uses it:
- * {{{
- *
- * public class MyAccum {
- * public MapView<String, Integer> map;
- * public long count;
- * }
- *
- * public class MyAgg extends AggregateFunction<Long, MyAccum> {
- *
- * @Override
- * public MyAccum createAccumulator() {
- * MyAccum accum = new MyAccum();
- * accum.map = new MapView<>(Types.STRING, Types.INT);
- * accum.count = 0L;
- * return accum;
- * }
- *
- * public void accumulate(MyAccum accumulator, String id) {
- * try {
- * if (!accumulator.map.contains(id)) {
- * accumulator.map.put(id, 1);
- * accumulator.count++;
- * }
- * } catch (Exception e) {
- * e.printStackTrace();
- * }
- * }
- *
- * @Override
- * public Long getValue(MyAccum accumulator) {
- * return accumulator.count;
- * }
- * }
- *
- * }}}
- *
- * @param keyTypeInfo key type information
- * @param valueTypeInfo value type information
- * @tparam K key type
- * @tparam V value type
- */
-@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
-class MapView[K, V](
- @transient private[flink] val keyTypeInfo: TypeInformation[K],
- @transient private[flink] val valueTypeInfo: TypeInformation[V],
- private[flink] val map: util.Map[K, V])
- extends DataView {
-
- /**
- * Creates a MapView with the specified key and value types.
- *
- * @param keyTypeInfo The type of keys of the MapView.
- * @param valueTypeInfo The type of the values of the MapView.
- */
- def this(keyTypeInfo: TypeInformation[K], valueTypeInfo: TypeInformation[V]) {
- this(keyTypeInfo, valueTypeInfo, new util.HashMap[K, V]())
- }
-
- /**
- * Creates a MapView.
- */
- def this() = this(null, null)
-
- /**
- * Return the value for the specified key or { @code null } if the key is not in the map view.
- *
- * @param key The look up key.
- * @return The value for the specified key.
- * @throws Exception Thrown if the system cannot get data.
- */
- @throws[Exception]
- def get(key: K): V = map.get(key)
-
- /**
- * Inserts a value for the given key into the map view.
- * If the map view already contains a value for the key, the existing value is overwritten.
- *
- * @param key The key for which the value is inserted.
- * @param value The value that is inserted for the key.
- * @throws Exception Thrown if the system cannot put data.
- */
- @throws[Exception]
- def put(key: K, value: V): Unit = map.put(key, value)
-
- /**
- * Inserts all mappings from the specified map to this map view.
- *
- * @param map The map whose entries are inserted into this map view.
- * @throws Exception Thrown if the system cannot access the map.
- */
- @throws[Exception]
- def putAll(map: util.Map[K, V]): Unit = this.map.putAll(map)
-
- /**
- * Deletes the value for the given key.
- *
- * @param key The key for which the value is deleted.
- * @throws Exception Thrown if the system cannot access the map.
- */
- @throws[Exception]
- def remove(key: K): Unit = map.remove(key)
-
- /**
- * Checks if the map view contains a value for a given key.
- *
- * @param key The key to check.
- * @return True if there exists a value for the given key, false otherwise.
- * @throws Exception Thrown if the system cannot access the map.
- */
- @throws[Exception]
- def contains(key: K): Boolean = map.containsKey(key)
-
- /**
- * Returns all entries of the map view.
- *
- * @return An iterable of all the key-value pairs in the map view.
- * @throws Exception Thrown if the system cannot access the map.
- */
- @throws[Exception]
- def entries: JIterable[util.Map.Entry[K, V]] = map.entrySet()
-
- /**
- * Returns all the keys in the map view.
- *
- * @return An iterable of all the keys in the map.
- * @throws Exception Thrown if the system cannot access the map.
- */
- @throws[Exception]
- def keys: JIterable[K] = map.keySet()
-
- /**
- * Returns all the values in the map view.
- *
- * @return An iterable of all the values in the map.
- * @throws Exception Thrown if the system cannot access the map.
- */
- @throws[Exception]
- def values: JIterable[V] = map.values()
-
- /**
- * Returns an iterator over all entries of the map view.
- *
- * @return An iterator over all the mappings in the map.
- * @throws Exception Thrown if the system cannot access the map.
- */
- @throws[Exception]
- def iterator: util.Iterator[util.Map.Entry[K, V]] = map.entrySet().iterator()
-
- /**
- * Removes all entries of this map.
- */
- override def clear(): Unit = map.clear()
-
- override def equals(other: Any): Boolean = other match {
- case that: MapView[K, V] =>
- map.equals(that.map)
- case _ => false
- }
-
- override def hashCode(): Int = map.hashCode()
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
deleted file mode 100644
index 75f9326..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import org.apache.flink.api.common.typeutils._
-import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializerSnapshot}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-import org.apache.flink.table.api.dataview.ListView
-
-/**
- * A serializer for [[ListView]]. The serializer relies on an element
- * serializer for the serialization of the list's elements.
- *
- * The serialization format for the list is as follows: four bytes for the length of the list,
- * followed by the serialized representation of each element.
- *
- * @param listSerializer List serializer.
- * @tparam T The type of element in the list.
- */
-@SerialVersionUID(-2030398712359267867L)
-class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]])
- extends TypeSerializer[ListView[T]]
- with LegacySerializerSnapshotTransformer[ListView[T]] {
-
- override def isImmutableType: Boolean = false
-
- override def duplicate(): TypeSerializer[ListView[T]] = {
- new ListViewSerializer[T](listSerializer.duplicate())
- }
-
- override def createInstance(): ListView[T] = {
- new ListView[T]
- }
-
- override def copy(from: ListView[T]): ListView[T] = {
- new ListView[T](null, listSerializer.copy(from.list))
- }
-
- override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from)
-
- override def getLength: Int = -1
-
- override def serialize(record: ListView[T], target: DataOutputView): Unit = {
- listSerializer.serialize(record.list, target)
- }
-
- override def deserialize(source: DataInputView): ListView[T] = {
- new ListView[T](null, listSerializer.deserialize(source))
- }
-
- override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] =
- deserialize(source)
-
- override def copy(source: DataInputView, target: DataOutputView): Unit =
- listSerializer.copy(source, target)
-
- override def hashCode(): Int = listSerializer.hashCode()
-
- override def equals(obj: Any): Boolean =
- listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
-
- override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
- new ListViewSerializerSnapshot[T](this)
-
- /**
- * We need to override this as a [[LegacySerializerSnapshotTransformer]]
- * because in Flink 1.6.x and below, this serializer was incorrectly returning
- * directly the snapshot of the nested list serializer as its own snapshot.
- *
- * <p>This method transforms the incorrect list serializer snapshot
- * to be a proper [[ListViewSerializerSnapshot]].
- */
- override def transformLegacySerializerSnapshot[U](
- legacySnapshot: TypeSerializerSnapshot[U]
- ): TypeSerializerSnapshot[ListView[T]] = {
-
- legacySnapshot match {
- case correctSnapshot: ListViewSerializerSnapshot[T] =>
- correctSnapshot
-
- case legacySnapshot: CollectionSerializerConfigSnapshot[java.util.List[T], T] =>
- // first, transform the incorrect list serializer's snapshot
- // into a proper ListSerializerSnapshot
- val transformedNestedListSerializerSnapshot = new ListSerializerSnapshot[T]
- CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
- transformedNestedListSerializerSnapshot,
- legacySnapshot.getSingleNestedSerializerAndConfig.f1)
-
- // then, wrap the transformed ListSerializerSnapshot
- // as a nested snapshot in the final resulting ListViewSerializerSnapshot
- val transformedListViewSerializerSnapshot = new ListViewSerializerSnapshot[T]()
- CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
- transformedListViewSerializerSnapshot,
- transformedNestedListSerializerSnapshot)
-
- transformedListViewSerializerSnapshot
- }
- }
-
- def getListSerializer: TypeSerializer[java.util.List[T]] = listSerializer
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
deleted file mode 100644
index a10b675..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.base.ListSerializer
-import org.apache.flink.table.api.dataview.ListView
-
-/**
- * [[ListView]] type information.
- *
- * @param elementType element type information
- * @tparam T element type
- */
-class ListViewTypeInfo[T](val elementType: TypeInformation[T])
- extends TypeInformation[ListView[T]] {
-
- override def isBasicType: Boolean = false
-
- override def isTupleType: Boolean = false
-
- override def getArity: Int = 1
-
- override def getTotalFields: Int = 1
-
- override def getTypeClass: Class[ListView[T]] = classOf[ListView[T]]
-
- override def isKeyType: Boolean = false
-
- override def createSerializer(config: ExecutionConfig): TypeSerializer[ListView[T]] = {
- val typeSer = elementType.createSerializer(config)
- new ListViewSerializer[T](new ListSerializer[T](typeSer))
- }
-
- override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass
-
- override def hashCode(): Int = 31 * elementType.hashCode
-
- override def equals(obj: Any): Boolean = canEqual(obj) && {
- obj match {
- case other: ListViewTypeInfo[T] =>
- elementType.equals(other.elementType)
- case _ => false
- }
- }
-
- override def toString: String = s"ListView<$elementType>"
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala
deleted file mode 100644
index eda6cb9..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import java.lang.reflect.Type
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation}
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.table.api.dataview.ListView
-
-class ListViewTypeInfoFactory[T] extends TypeInfoFactory[ListView[T]] {
-
- override def createTypeInfo(
- t: Type,
- genericParameters: util.Map[String, TypeInformation[_]]): TypeInformation[ListView[T]] = {
-
- var elementType = genericParameters.get("T")
-
- if (elementType == null) {
- // we might can get the elementType later from the ListView constructor
- elementType = new GenericTypeInfo(classOf[Any])
- }
-
- new ListViewTypeInfo[T](elementType.asInstanceOf[TypeInformation[T]])
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
deleted file mode 100644
index 9947c35..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import org.apache.flink.api.common.typeutils._
-import org.apache.flink.api.common.typeutils.base.{MapSerializerConfigSnapshot, MapSerializerSnapshot}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-import org.apache.flink.table.api.dataview.MapView
-
-/**
- * A serializer for [[MapView]]. The serializer relies on a key serializer and a value
- * serializer for the serialization of the map's key-value pairs.
- *
- * The serialization format for the map is as follows: four bytes for the length of the map,
- * followed by the serialized representation of each key-value pair. To allow null values,
- * each value is prefixed by a null marker.
- *
- * @param mapSerializer Map serializer.
- * @tparam K The type of the keys in the map.
- * @tparam V The type of the values in the map.
- */
-@SerialVersionUID(-9007142882049098705L)
-class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K, V]])
- extends TypeSerializer[MapView[K, V]]
- with LegacySerializerSnapshotTransformer[MapView[K, V]] {
-
- override def isImmutableType: Boolean = false
-
- override def duplicate(): TypeSerializer[MapView[K, V]] =
- new MapViewSerializer[K, V](mapSerializer.duplicate())
-
- override def createInstance(): MapView[K, V] = {
- new MapView[K, V]()
- }
-
- override def copy(from: MapView[K, V]): MapView[K, V] = {
- new MapView[K, V](null, null, mapSerializer.copy(from.map))
- }
-
- override def copy(from: MapView[K, V], reuse: MapView[K, V]): MapView[K, V] = copy(from)
-
- override def getLength: Int = -1 // var length
-
- override def serialize(record: MapView[K, V], target: DataOutputView): Unit = {
- mapSerializer.serialize(record.map, target)
- }
-
- override def deserialize(source: DataInputView): MapView[K, V] = {
- new MapView[K, V](null, null, mapSerializer.deserialize(source))
- }
-
- override def deserialize(reuse: MapView[K, V], source: DataInputView): MapView[K, V] =
- deserialize(source)
-
- override def copy(source: DataInputView, target: DataOutputView): Unit =
- mapSerializer.copy(source, target)
-
- override def hashCode(): Int = mapSerializer.hashCode()
-
- override def equals(obj: Any): Boolean =
- mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer)
-
- override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
- new MapViewSerializerSnapshot[K, V](this)
-
- /**
- * We need to override this as a [[LegacySerializerSnapshotTransformer]]
- * because in Flink 1.6.x and below, this serializer was incorrectly returning
- * directly the snapshot of the nested map serializer as its own snapshot.
- *
- * <p>This method transforms the incorrect map serializer snapshot
- * to be a proper [[MapViewSerializerSnapshot]].
- */
- override def transformLegacySerializerSnapshot[U](
- legacySnapshot: TypeSerializerSnapshot[U]
- ): TypeSerializerSnapshot[MapView[K, V]] = {
-
- legacySnapshot match {
- case correctSnapshot: MapViewSerializerSnapshot[K, V] =>
- correctSnapshot
-
- case legacySnapshot: MapSerializerConfigSnapshot[K, V] =>
- // first, transform the incorrect map serializer's snapshot
- // into a proper ListSerializerSnapshot
- val transformedNestedMapSerializerSnapshot =
- new MapSerializerSnapshot[K, V]
- CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
- transformedNestedMapSerializerSnapshot,
- legacySnapshot.getNestedSerializersAndConfigs.get(0).f1,
- legacySnapshot.getNestedSerializersAndConfigs.get(1).f1
- )
-
- // then, wrap the transformed MapSerializerSnapshot
- // as a nested snapshot in the final resulting MapViewSerializerSnapshot
- val transformedMapViewSerializerSnapshot =
- new MapViewSerializerSnapshot[K, V]()
- CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
- transformedMapViewSerializerSnapshot,
- transformedNestedMapSerializerSnapshot
- )
-
- transformedMapViewSerializerSnapshot
- }
- }
-
- def getMapSerializer: TypeSerializer[java.util.Map[K, V]] = mapSerializer
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
deleted file mode 100644
index ec5c222..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.base.MapSerializer
-import org.apache.flink.table.api.dataview.MapView
-
-/**
- * [[MapView]] type information.
- *
- * @param keyType key type information
- * @param valueType value type information
- * @tparam K key type
- * @tparam V value type
- */
-class MapViewTypeInfo[K, V](
- val keyType: TypeInformation[K],
- val valueType: TypeInformation[V])
- extends TypeInformation[MapView[K, V]] {
-
- override def isBasicType = false
-
- override def isTupleType = false
-
- override def getArity = 1
-
- override def getTotalFields = 1
-
- override def getTypeClass: Class[MapView[K, V]] = classOf[MapView[K, V]]
-
- override def isKeyType: Boolean = false
-
- override def createSerializer(config: ExecutionConfig): TypeSerializer[MapView[K, V]] = {
- val keySer = keyType.createSerializer(config)
- val valueSer = valueType.createSerializer(config)
- new MapViewSerializer[K, V](new MapSerializer[K, V](keySer, valueSer))
- }
-
- override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass
-
- override def hashCode(): Int = 31 * keyType.hashCode + valueType.hashCode
-
- override def equals(obj: Any): Boolean = canEqual(obj) && {
- obj match {
- case other: MapViewTypeInfo[_, _] =>
- keyType.equals(other.keyType) &&
- valueType.equals(other.valueType)
- case _ => false
- }
- }
-
- override def toString: String = s"MapView<$keyType, $valueType>"
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala
deleted file mode 100644
index 33c3ffe..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview
-
-import java.lang.reflect.Type
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation}
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.table.api.dataview.MapView
-
-class MapViewTypeInfoFactory[K, V] extends TypeInfoFactory[MapView[K, V]] {
-
- override def createTypeInfo(
- t: Type,
- genericParameters: util.Map[String, TypeInformation[_]]): TypeInformation[MapView[K, V]] = {
-
- var keyType = genericParameters.get("K")
- var valueType = genericParameters.get("V")
-
- if (keyType == null) {
- // we might can get the keyType later from the MapView constructor
- keyType = new GenericTypeInfo(classOf[Any])
- }
-
- if (valueType == null) {
- // we might can get the valueType later from the MapView constructor
- valueType = new GenericTypeInfo(classOf[Any])
- }
-
- new MapViewTypeInfo[K, V](
- keyType.asInstanceOf[TypeInformation[K]],
- valueType.asInstanceOf[TypeInformation[V]])
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
index 5186d66..42ce9a5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.functions.aggfunctions
import java.lang.{Iterable => JIterable}
import java.util
-
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils._
import org.apache.flink.table.api.dataview.MapView
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index f757189..cdb2479 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -19,16 +19,6 @@
package org.apache.flink.table.functions.utils
-import java.lang.reflect.{Method, Modifier}
-import java.lang.{Integer => JInt, Long => JLong}
-import java.sql.{Date, Time, Timestamp}
-import java.util
-
-import com.google.common.primitives.Primitives
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
-import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, SqlOperandCountRange, SqlOperator}
import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
@@ -36,11 +26,21 @@ import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtract
import org.apache.flink.table.api.dataview._
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.dataview._
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
import org.apache.flink.table.functions._
import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
import org.apache.flink.table.typeutils.FieldInfoUtils
-import org.apache.flink.util.InstantiationUtil
+
+import com.google.common.primitives.Primitives
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, SqlOperandCountRange, SqlOperator}
+
+import java.lang.reflect.{Method, Modifier}
+import java.lang.{Integer => JInt, Long => JLong}
+import java.sql.{Date, Time, Timestamp}
+import java.util
import scala.collection.mutable
@@ -474,8 +474,8 @@ object UserDefinedFunctionUtils {
case map: MapViewTypeInfo[_, _] =>
val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
if (mapView != null) {
- val keyTypeInfo = mapView.keyTypeInfo
- val valueTypeInfo = mapView.valueTypeInfo
+ val keyTypeInfo = mapView.keyType
+ val valueTypeInfo = mapView.valueType
val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) {
new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
} else {
@@ -499,7 +499,7 @@ object UserDefinedFunctionUtils {
case list: ListViewTypeInfo[_] =>
val listView = field.get(acc).asInstanceOf[ListView[_]]
if (listView != null) {
- val elementTypeInfo = listView.elementTypeInfo
+ val elementTypeInfo = listView.elementType
val newTypeInfo = if (elementTypeInfo != null) {
new ListViewTypeInfo(elementTypeInfo)
} else {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala
index 3f70bce..227551e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.dataview
import java.lang.Long
import java.util.Random
-
import org.apache.flink.api.common.typeutils.base.{ListSerializer, LongSerializer}
import org.apache.flink.api.common.typeutils.{SerializerTestBase, TypeSerializer}
import org.apache.flink.table.api.dataview.ListView
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala
index 15f9b02..03ab7f1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.dataview
import java.lang.Long
import java.util.Random
-
import org.apache.flink.api.common.typeutils.base.{LongSerializer, MapSerializer, StringSerializer}
import org.apache.flink.api.common.typeutils.{SerializerTestBase, TypeSerializer}
import org.apache.flink.table.api.dataview.MapView
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java
index 5b79e0f..d215a03 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.table.typeutils.ListViewTypeInfo;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
/**
* Default implementation of StateDataViewStore that currently forwards state registration
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java
index ec0f675..fcfeb68 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java
@@ -30,8 +30,6 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.table.typeutils.ListViewTypeInfo;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
/**
* An implementation of StateDataViewStore for window aggregates which forward the state
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java
index 11abeb8..aa1fb2f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java
@@ -19,8 +19,6 @@
package org.apache.flink.table.dataview;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.table.typeutils.ListViewTypeInfo;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
/**
* This interface contains methods for registering {@link StateDataView} with a managed store.
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
index 26d4dec..ecdbaa3 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.dataview.MapViewTypeInfo;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.types.logical.DecimalType;
@@ -39,7 +40,6 @@ import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.typeutils.BigDecimalTypeInfo;
import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.typeutils.DecimalTypeInfo;
-import org.apache.flink.table.typeutils.MapViewTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;