You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/11 16:16:40 UTC
[flink] branch master updated: [FLINK-10170] [table] Add string
representation for all Table & SQL API types
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ae5983 [FLINK-10170] [table] Add string representation for all Table & SQL API types
1ae5983 is described below
commit 1ae5983bc2b267ed7725338ef932505477aee7b8
Author: jerryjzhang <zh...@163.com>
AuthorDate: Sun Aug 19 22:27:01 2018 +0800
[FLINK-10170] [table] Add string representation for all Table & SQL API types
Since 1.6 the recommended way of creating source/sink tables is using
connector/format/schema descriptors. This commit adds string-based
representation for all types supported by the Table & SQL API.
We use a syntax similar to Hive and other SQL projects.
This closes #6578.
---
docs/dev/table/connect.md | 21 +++--
docs/dev/table/sqlClient.md | 2 +-
.../test-scripts/test_sql_client.sh | 2 +-
.../apache/flink/table/descriptors/JsonTest.java | 2 +-
.../flink/table/typeutils/TypeStringUtils.scala | 98 ++++++++++++++++------
.../apache/flink/table/descriptors/CsvTest.scala | 4 +-
.../table/descriptors/TableDescriptorTest.scala | 30 ++++++-
.../table/typeutils/TypeStringUtilsTest.scala | 42 ++++++++--
8 files changed, 152 insertions(+), 49 deletions(-)
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 1bfff42..16649e5 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -417,13 +417,18 @@ DECIMAL
DATE
TIME
TIMESTAMP
-ROW(fieldtype, ...) # unnamed row; e.g. ROW(VARCHAR, INT) that is mapped to Flink's RowTypeInfo
- # with indexed fields names f0, f1, ...
-ROW(fieldname fieldtype, ...) # named row; e.g., ROW(myField VARCHAR, myOtherField INT) that
- # is mapped to Flink's RowTypeInfo
-POJO(class) # e.g., POJO(org.mycompany.MyPojoClass) that is mapped to Flink's PojoTypeInfo
-ANY(class) # e.g., ANY(org.mycompany.MyClass) that is mapped to Flink's GenericTypeInfo
-ANY(class, serialized) # used for type information that is not supported by Flink's Table & SQL API
+MAP<fieldtype, fieldtype> # generic map; e.g. MAP<VARCHAR, INT> that is mapped to Flink's MapTypeInfo
+MULTISET<fieldtype> # multiset; e.g. MULTISET<VARCHAR> that is mapped to Flink's MultisetTypeInfo
+PRIMITIVE_ARRAY<fieldtype> # primitive array; e.g. PRIMITIVE_ARRAY<INT> that is mapped to Flink's PrimitiveArrayTypeInfo
+OBJECT_ARRAY<fieldtype> # object array; e.g. OBJECT_ARRAY<POJO(org.mycompany.MyPojoClass)> that is mapped to
+ # Flink's ObjectArrayTypeInfo
+ROW<fieldtype, ...> # unnamed row; e.g. ROW<VARCHAR, INT> that is mapped to Flink's RowTypeInfo
+ # with indexed fields names f0, f1, ...
+ROW<fieldname fieldtype, ...> # named row; e.g., ROW<myField VARCHAR, myOtherField INT> that
+ # is mapped to Flink's RowTypeInfo
+POJO<class> # e.g., POJO<org.mycompany.MyPojoClass> that is mapped to Flink's PojoTypeInfo
+ANY<class> # e.g., ANY<org.mycompany.MyClass> that is mapped to Flink's GenericTypeInfo
+ANY<class, serialized> # used for type information that is not supported by Flink's Table & SQL API
{% endhighlight %}
{% top %}
@@ -1046,4 +1051,4 @@ table.writeToSink(sink)
</div>
</div>
-{% top %}
\ No newline at end of file
+{% top %}
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 8c4ba83..296d638 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -302,7 +302,7 @@ tables:
format:
property-version: 1
type: json
- schema: "ROW(rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP)"
+ schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>"
schema:
- name: rideId
type: LONG
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 934f7d4..b583072 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -128,7 +128,7 @@ tables:
- name: user
type: VARCHAR
- name: event
- type: ROW(type VARCHAR, message VARCHAR)
+ type: ROW<type VARCHAR, message VARCHAR>
connector:
type: kafka
version: "0.10"
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
index 6e370a0..ac6ff11 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
@@ -106,7 +106,7 @@ public class JsonTest extends DescriptorTestBase {
final Map<String, String> props3 = new HashMap<>();
props3.put("format.type", "json");
props3.put("format.property-version", "1");
- props3.put("format.schema", "ROW(test1 VARCHAR, test2 TIMESTAMP)");
+ props3.put("format.schema", "ROW<test1 VARCHAR, test2 TIMESTAMP>");
props3.put("format.fail-on-missing-field", "true");
final Map<String, String> props4 = new HashMap<>();
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
index afc6506..9e5f075 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
@@ -23,7 +23,7 @@ import java.io.Serializable
import org.apache.commons.codec.binary.Base64
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.flink.api.common.functions.InvalidTypesException
-import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils._
import org.apache.flink.table.api.{TableException, Types, ValidationException}
@@ -67,6 +67,10 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
lazy val ROW: Keyword = Keyword("ROW")
lazy val ANY: Keyword = Keyword("ANY")
lazy val POJO: Keyword = Keyword("POJO")
+ lazy val MAP: Keyword = Keyword("MAP")
+ lazy val MULTISET: Keyword = Keyword("MULTISET")
+ lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY")
+ lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY")
lazy val qualifiedName: Parser[String] =
"""\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r
@@ -74,6 +78,13 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
lazy val base64Url: Parser[String] =
"""[A-Za-z0-9_-]*""".r
+ // keep parenthesis to ensure backward compatibility (this can be dropped after Flink 1.7)
+ lazy val leftBracket: PackratParser[(String)] =
+ "(" | "<"
+
+ lazy val rightBracket: PackratParser[(String)] =
+ ")" | ">"
+
lazy val atomic: PackratParser[TypeInformation[_]] =
(VARCHAR | STRING) ^^ { e => Types.STRING } |
BOOLEAN ^^ { e => Types.BOOLEAN } |
@@ -101,34 +112,35 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
}
lazy val namedRow: PackratParser[TypeInformation[_]] =
- ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ {
+ ROW ~ leftBracket ~> rep1sep(field, ", ") <~ rightBracket ^^ {
fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray)
} | failure("Named row type expected.")
lazy val unnamedRow: PackratParser[TypeInformation[_]] =
- ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ {
+ ROW ~ leftBracket ~> rep1sep(typeInfo, ", ") <~ rightBracket ^^ {
types => Types.ROW(types: _*)
} | failure("Unnamed row type expected.")
lazy val generic: PackratParser[TypeInformation[_]] =
- ANY ~ "(" ~> qualifiedName <~ ")" ^^ {
+ ANY ~ leftBracket ~> qualifiedName <~ rightBracket ^^ {
typeClass =>
val clazz = loadClass(typeClass)
new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]])
}
- lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> qualifiedName <~ ")" ^^ {
- typeClass =>
- val clazz = loadClass(typeClass)
- val info = TypeExtractor.createTypeInfo(clazz)
- if (!info.isInstanceOf[PojoTypeInfo[_]]) {
- throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
- }
- info
- }
+ lazy val pojo: PackratParser[TypeInformation[_]] =
+ POJO ~ leftBracket ~> qualifiedName <~ rightBracket ^^ {
+ typeClass =>
+ val clazz = loadClass(typeClass)
+ val info = TypeExtractor.createTypeInfo(clazz)
+ if (!info.isInstanceOf[PojoTypeInfo[_]]) {
+ throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
+ }
+ info
+ }
lazy val any: PackratParser[TypeInformation[_]] =
- ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ {
+ ANY ~ leftBracket ~ qualifiedName ~ "," ~ base64Url ~ rightBracket ^^ {
case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=>
val clazz = loadClass(typeClass)
val typeInfo = deserialize(serialized)
@@ -140,8 +152,38 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
typeInfo
}
+ lazy val genericMap: PackratParser[TypeInformation[_]] =
+ MAP ~ leftBracket ~ typeInfo ~ "," ~ typeInfo ~ rightBracket ^^ {
+ case _ ~ _ ~ keyTypeInfo ~ _ ~ valueTypeInfo ~ _=>
+ Types.MAP(keyTypeInfo, valueTypeInfo)
+ }
+
+ lazy val multiSet: PackratParser[TypeInformation[_]] =
+ MULTISET ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+ case _ ~ _ ~ elementTypeInfo ~ _ =>
+ Types.MULTISET(elementTypeInfo)
+ }
+
+ lazy val primitiveArray: PackratParser[TypeInformation[_]] =
+ PRIMITIVE_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+ case _ ~ _ ~ componentTypeInfo ~ _ =>
+ Types.PRIMITIVE_ARRAY(componentTypeInfo)
+ }
+
+ lazy val objectArray: PackratParser[TypeInformation[_]] =
+ OBJECT_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+ case _ ~ _ ~ componentTypeInfo ~ _ =>
+ Types.OBJECT_ARRAY(componentTypeInfo)
+ }
+
+ lazy val map: PackratParser[TypeInformation[_]] =
+ genericMap | multiSet
+
+ lazy val array: PackratParser[TypeInformation[_]] =
+ primitiveArray | objectArray
+
lazy val typeInfo: PackratParser[TypeInformation[_]] =
- namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid type.")
+ namedRow | unnamedRow | any | generic | pojo | atomic | map | array | failure("Invalid type.")
def readTypeInfo(typeString: String): TypeInformation[_] = {
parseAll(typeInfo, typeString) match {
@@ -182,10 +224,10 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
s"$name ${normalizeTypeInfo(f._2)}"
}
- s"${ROW.key}(${normalizedFields.mkString(", ")})"
+ s"${ROW.key}<${normalizedFields.mkString(", ")}>"
case generic: GenericTypeInfo[_] =>
- s"${ANY.key}(${generic.getTypeClass.getName})"
+ s"${ANY.key}<${generic.getTypeClass.getName}>"
case pojo: PojoTypeInfo[_] =>
// we only support very simple POJOs that only contain extracted fields
@@ -196,24 +238,28 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
case _: InvalidTypesException => None
}
extractedInfo match {
- case Some(ei) if ei == pojo => s"${POJO.key}(${pojo.getTypeClass.getName})"
+ case Some(ei) if ei == pojo => s"${POJO.key}<${pojo.getTypeClass.getName}>"
case _ =>
throw new TableException(
"A string representation for custom POJO types is not supported yet.")
}
- case _: CompositeType[_] =>
- throw new TableException("A string representation for composite types is not supported yet.")
+ case array: PrimitiveArrayTypeInfo[_] =>
+ s"${PRIMITIVE_ARRAY.key}<${normalizeTypeInfo(array.getComponentType)}>"
+
+ case array: ObjectArrayTypeInfo[_, _] =>
+ s"${OBJECT_ARRAY.key}<${normalizeTypeInfo(array.getComponentInfo)}>"
- case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
- _: PrimitiveArrayTypeInfo[_] =>
- throw new TableException("A string representation for array types is not supported yet.")
+ case set: MultisetTypeInfo[_] =>
+ s"${MULTISET.key}<${normalizeTypeInfo(set.getElementTypeInfo)}>"
- case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] =>
- throw new TableException("A string representation for map types is not supported yet.")
+ case map: MapTypeInfo[_, _] =>
+ val normalizedKey = normalizeTypeInfo(map.getKeyTypeInfo)
+ val normalizedValue = normalizeTypeInfo(map.getValueTypeInfo)
+ s"${MAP.key}<$normalizedKey, $normalizedValue>"
case any: TypeInformation[_] =>
- s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})"
+ s"${ANY.key}<${any.getTypeClass.getName}, ${serialize(any)}>"
}
// ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
index 0c2e806..8d01b8b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
@@ -78,9 +78,9 @@ class CsvTest extends DescriptorTestBase {
"format.fields.1.name" -> "field2",
"format.fields.1.type" -> "TIMESTAMP",
"format.fields.2.name" -> "field3",
- "format.fields.2.type" -> "ANY(java.lang.Class)",
+ "format.fields.2.type" -> "ANY<java.lang.Class>",
"format.fields.3.name" -> "field4",
- "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
+ "format.fields.3.type" -> "ROW<test INT, row VARCHAR>",
"format.line-delimiter" -> "^")
val props2 = Map(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
index ccac317..00e3a21 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -18,7 +18,9 @@
package org.apache.flink.table.descriptors
+import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.table.api.Types
+import org.apache.flink.table.runtime.utils.CommonTestData.Person
import org.apache.flink.table.utils.TableTestBase
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -45,6 +47,10 @@ class TableDescriptorTest extends TableTestBase {
val schema = Schema()
.field("myfield", Types.STRING)
.field("myfield2", Types.INT)
+ .field("myfield3", Types.MAP(Types.STRING, Types.INT))
+ .field("myfield4", Types.MULTISET(Types.LONG))
+ .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT))
+ .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
// CSV table source and sink do not support proctime yet
//if (isStreaming) {
// schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
@@ -56,6 +62,10 @@ class TableDescriptorTest extends TableTestBase {
val format = Csv()
.field("myfield", Types.STRING)
.field("myfield2", Types.INT)
+ .field("myfield3", Types.MAP(Types.STRING, Types.INT))
+ .field("myfield4", Types.MULTISET(Types.LONG))
+ .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT))
+ .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
.fieldDelimiter("#")
val descriptor: RegistrableDescriptor = if (isStreaming) {
@@ -84,11 +94,29 @@ class TableDescriptorTest extends TableTestBase {
"format.fields.0.type" -> "VARCHAR",
"format.fields.1.name" -> "myfield2",
"format.fields.1.type" -> "INT",
+ "format.fields.2.name" -> "myfield3",
+ "format.fields.2.type" -> "MAP<VARCHAR, INT>",
+ "format.fields.3.name" -> "myfield4",
+ "format.fields.3.type" -> "MULTISET<BIGINT>",
+ "format.fields.4.name" -> "myfield5",
+ "format.fields.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>",
+ "format.fields.5.name" -> "myfield6",
+ "format.fields.5.type" ->
+ "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>",
"format.field-delimiter" -> "#",
"schema.0.name" -> "myfield",
"schema.0.type" -> "VARCHAR",
"schema.1.name" -> "myfield2",
- "schema.1.type" -> "INT"
+ "schema.1.type" -> "INT",
+ "schema.2.name" -> "myfield3",
+ "schema.2.type" -> "MAP<VARCHAR, INT>",
+ "schema.3.name" -> "myfield4",
+ "schema.3.type" -> "MULTISET<BIGINT>",
+ "schema.4.name" -> "myfield5",
+ "schema.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>",
+ "schema.5.name" -> "myfield6",
+ "schema.5.type" ->
+ "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>"
)
val expectedProperties = if (isStreaming) {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
index 9ea8be0..f35b0e0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
@@ -18,14 +18,12 @@
package org.apache.flink.table.typeutils
-import java.util
-
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor}
import org.apache.flink.table.api.Types
import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Assert, Test}
+import org.junit.Test
/**
* Tests for string-based representation of [[TypeInformation]].
@@ -49,7 +47,7 @@ class TypeStringUtilsTest {
// unsupported type information
testReadAndWrite(
- "ANY(java.lang.Void, " +
+ "ANY<java.lang.Void, " +
"rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl" +
"ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z" +
"c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h" +
@@ -59,32 +57,58 @@ class TypeStringUtilsTest {
"cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA" +
"AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp" +
"YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1" +
- "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)",
+ "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA>",
BasicTypeInfo.VOID_TYPE_INFO)
}
@Test
def testWriteComplexTypes(): Unit = {
testReadAndWrite(
- "ROW(f0 DECIMAL, f1 TINYINT)",
+ "ROW<f0 DECIMAL, f1 TINYINT>",
Types.ROW(Types.DECIMAL, Types.BYTE))
testReadAndWrite(
- "ROW(hello DECIMAL, world TINYINT)",
+ "ROW<hello DECIMAL, world TINYINT>",
Types.ROW(
Array[String]("hello", "world"),
Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
testReadAndWrite(
- "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
+ "POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>",
TypeExtractor.createTypeInfo(classOf[Person]))
testReadAndWrite(
- "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
+ "ANY<org.apache.flink.table.runtime.utils.CommonTestData$NonPojo>",
TypeExtractor.createTypeInfo(classOf[NonPojo]))
+ testReadAndWrite(
+ "MAP<VARCHAR, ROW<f0 DECIMAL, f1 TINYINT>>",
+ Types.MAP(Types.STRING, Types.ROW(Types.DECIMAL, Types.BYTE))
+ )
+
+ testReadAndWrite(
+ "MULTISET<ROW<f0 DECIMAL, f1 TINYINT>>",
+ Types.MULTISET(Types.ROW(Types.DECIMAL, Types.BYTE))
+ )
+
+ testReadAndWrite(
+ "PRIMITIVE_ARRAY<TINYINT>",
+ Types.PRIMITIVE_ARRAY(Types.BYTE)
+ )
+
+ testReadAndWrite(
+ "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>",
+ Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))
+ )
+
// test escaping
assertTrue(
+ TypeStringUtils.readTypeInfo("ROW<\"he \\nllo\" DECIMAL, world TINYINT>")
+ .asInstanceOf[RowTypeInfo].getFieldNames
+ .sameElements(Array[String]("he \nllo", "world")))
+
+ // test backward compatibility with brackets ()
+ assertTrue(
TypeStringUtils.readTypeInfo("ROW(\"he \\nllo\" DECIMAL, world TINYINT)")
.asInstanceOf[RowTypeInfo].getFieldNames
.sameElements(Array[String]("he \nllo", "world")))