You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/11 16:25:47 UTC

[flink] branch release-1.6 updated: [FLINK-10170] [table] Add string representation for all Table & SQL API types

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 636a526  [FLINK-10170] [table] Add string representation for all Table & SQL API types
636a526 is described below

commit 636a526ab2ce5e2f2b00597d53c2455b6e53bfa8
Author: jerryjzhang <zh...@163.com>
AuthorDate: Sun Aug 19 22:27:01 2018 +0800

    [FLINK-10170] [table] Add string representation for all Table & SQL API types
    
    Since 1.6 the recommended way of creating source/sink tables is using
    connector/format/schema descriptors. This commit adds string-based
    representation for all types supported by the Table & SQL API.
    
    We use a syntax similar to Hive and other SQL projects.
    
    This closes #6578.
---
 docs/dev/table/sqlClient.md                        |  2 +-
 .../test-scripts/test_sql_client.sh                |  2 +-
 .../apache/flink/table/descriptors/JsonTest.java   |  2 +-
 .../flink/table/typeutils/TypeStringUtils.scala    | 98 ++++++++++++++++------
 .../apache/flink/table/descriptors/CsvTest.scala   |  4 +-
 .../table/descriptors/TableDescriptorTest.scala    | 30 ++++++-
 .../table/typeutils/TypeStringUtilsTest.scala      | 42 ++++++++--
 7 files changed, 139 insertions(+), 41 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index ed36354..c88ed38 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -264,7 +264,7 @@ tables:
     format:
       property-version: 1
       type: json
-      schema: "ROW(rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP)"
+      schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>"
     schema:
       - name: rideId
         type: LONG
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 934f7d4..b583072 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -128,7 +128,7 @@ tables:
       - name: user
         type: VARCHAR
       - name: event
-        type: ROW(type VARCHAR, message VARCHAR)
+        type: ROW<type VARCHAR, message VARCHAR>
     connector:
       type: kafka
       version: "0.10"
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
index 6e370a0..ac6ff11 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
@@ -106,7 +106,7 @@ public class JsonTest extends DescriptorTestBase {
 		final Map<String, String> props3 = new HashMap<>();
 		props3.put("format.type", "json");
 		props3.put("format.property-version", "1");
-		props3.put("format.schema", "ROW(test1 VARCHAR, test2 TIMESTAMP)");
+		props3.put("format.schema", "ROW<test1 VARCHAR, test2 TIMESTAMP>");
 		props3.put("format.fail-on-missing-field", "true");
 
 		final Map<String, String> props4 = new HashMap<>();
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
index afc6506..9e5f075 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
@@ -23,7 +23,7 @@ import java.io.Serializable
 import org.apache.commons.codec.binary.Base64
 import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.flink.api.common.functions.InvalidTypesException
-import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.table.api.{TableException, Types, ValidationException}
@@ -67,6 +67,10 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
   lazy val ROW: Keyword = Keyword("ROW")
   lazy val ANY: Keyword = Keyword("ANY")
   lazy val POJO: Keyword = Keyword("POJO")
+  lazy val MAP: Keyword = Keyword("MAP")
+  lazy val MULTISET: Keyword = Keyword("MULTISET")
+  lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY")
+  lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY")
 
   lazy val qualifiedName: Parser[String] =
     """\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r
@@ -74,6 +78,13 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
   lazy val base64Url: Parser[String] =
     """[A-Za-z0-9_-]*""".r
 
+  // keep parenthesis to ensure backward compatibility (this can be dropped after Flink 1.7)
+  lazy val leftBracket: PackratParser[(String)] =
+    "(" | "<"
+
+  lazy val rightBracket: PackratParser[(String)] =
+    ")" | ">"
+
   lazy val atomic: PackratParser[TypeInformation[_]] =
     (VARCHAR | STRING) ^^ { e => Types.STRING } |
     BOOLEAN ^^ { e => Types.BOOLEAN } |
@@ -101,34 +112,35 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
     }
 
   lazy val namedRow: PackratParser[TypeInformation[_]] =
-    ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ {
+    ROW ~ leftBracket ~> rep1sep(field, ", ") <~ rightBracket ^^ {
       fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray)
     } | failure("Named row type expected.")
 
   lazy val unnamedRow: PackratParser[TypeInformation[_]] =
-    ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ {
+    ROW ~ leftBracket ~> rep1sep(typeInfo, ", ") <~ rightBracket ^^ {
       types => Types.ROW(types: _*)
     } | failure("Unnamed row type expected.")
 
   lazy val generic: PackratParser[TypeInformation[_]] =
-    ANY ~ "(" ~> qualifiedName <~ ")" ^^ {
+    ANY ~ leftBracket ~> qualifiedName <~ rightBracket ^^ {
       typeClass =>
         val clazz = loadClass(typeClass)
         new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]])
     }
 
-  lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> qualifiedName <~ ")" ^^ {
-    typeClass =>
-      val clazz = loadClass(typeClass)
-      val info = TypeExtractor.createTypeInfo(clazz)
-      if (!info.isInstanceOf[PojoTypeInfo[_]]) {
-        throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
-      }
-      info
-  }
+  lazy val pojo: PackratParser[TypeInformation[_]] =
+    POJO ~ leftBracket ~> qualifiedName <~ rightBracket ^^ {
+      typeClass =>
+        val clazz = loadClass(typeClass)
+        val info = TypeExtractor.createTypeInfo(clazz)
+        if (!info.isInstanceOf[PojoTypeInfo[_]]) {
+          throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
+        }
+        info
+    }
 
   lazy val any: PackratParser[TypeInformation[_]] =
-    ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ {
+    ANY ~ leftBracket ~ qualifiedName ~ "," ~ base64Url ~ rightBracket ^^ {
       case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=>
         val clazz = loadClass(typeClass)
         val typeInfo = deserialize(serialized)
@@ -140,8 +152,38 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
         typeInfo
     }
 
+  lazy val genericMap: PackratParser[TypeInformation[_]] =
+    MAP ~ leftBracket ~ typeInfo ~ "," ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ keyTypeInfo ~ _ ~ valueTypeInfo ~ _=>
+        Types.MAP(keyTypeInfo, valueTypeInfo)
+    }
+
+  lazy val multiSet: PackratParser[TypeInformation[_]] =
+    MULTISET ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ elementTypeInfo ~ _ =>
+        Types.MULTISET(elementTypeInfo)
+    }
+
+  lazy val primitiveArray: PackratParser[TypeInformation[_]] =
+    PRIMITIVE_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ componentTypeInfo ~ _ =>
+        Types.PRIMITIVE_ARRAY(componentTypeInfo)
+    }
+
+  lazy val objectArray: PackratParser[TypeInformation[_]] =
+    OBJECT_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ componentTypeInfo ~ _ =>
+        Types.OBJECT_ARRAY(componentTypeInfo)
+    }
+
+  lazy val map: PackratParser[TypeInformation[_]] =
+    genericMap | multiSet
+
+  lazy val array: PackratParser[TypeInformation[_]] =
+    primitiveArray | objectArray
+
   lazy val typeInfo: PackratParser[TypeInformation[_]] =
-    namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid type.")
+    namedRow | unnamedRow | any | generic | pojo | atomic | map | array | failure("Invalid type.")
 
   def readTypeInfo(typeString: String): TypeInformation[_] = {
     parseAll(typeInfo, typeString) match {
@@ -182,10 +224,10 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
 
         s"$name ${normalizeTypeInfo(f._2)}"
       }
-      s"${ROW.key}(${normalizedFields.mkString(", ")})"
+      s"${ROW.key}<${normalizedFields.mkString(", ")}>"
 
     case generic: GenericTypeInfo[_] =>
-      s"${ANY.key}(${generic.getTypeClass.getName})"
+      s"${ANY.key}<${generic.getTypeClass.getName}>"
 
     case pojo: PojoTypeInfo[_] =>
       // we only support very simple POJOs that only contain extracted fields
@@ -196,24 +238,28 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers {
         case _: InvalidTypesException => None
       }
       extractedInfo match {
-        case Some(ei) if ei == pojo => s"${POJO.key}(${pojo.getTypeClass.getName})"
+        case Some(ei) if ei == pojo => s"${POJO.key}<${pojo.getTypeClass.getName}>"
         case _ =>
           throw new TableException(
             "A string representation for custom POJO types is not supported yet.")
       }
 
-    case _: CompositeType[_] =>
-      throw new TableException("A string representation for composite types is not supported yet.")
+    case array: PrimitiveArrayTypeInfo[_] =>
+      s"${PRIMITIVE_ARRAY.key}<${normalizeTypeInfo(array.getComponentType)}>"
+
+    case array: ObjectArrayTypeInfo[_, _] =>
+      s"${OBJECT_ARRAY.key}<${normalizeTypeInfo(array.getComponentInfo)}>"
 
-    case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
-         _: PrimitiveArrayTypeInfo[_] =>
-      throw new TableException("A string representation for array types is not supported yet.")
+    case set: MultisetTypeInfo[_] =>
+      s"${MULTISET.key}<${normalizeTypeInfo(set.getElementTypeInfo)}>"
 
-    case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] =>
-      throw new TableException("A string representation for map types is not supported yet.")
+    case map: MapTypeInfo[_, _] =>
+      val normalizedKey = normalizeTypeInfo(map.getKeyTypeInfo)
+      val normalizedValue = normalizeTypeInfo(map.getValueTypeInfo)
+      s"${MAP.key}<$normalizedKey, $normalizedValue>"
 
     case any: TypeInformation[_] =>
-      s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})"
+      s"${ANY.key}<${any.getTypeClass.getName}, ${serialize(any)}>"
   }
 
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
index 0c2e806..8d01b8b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
@@ -78,9 +78,9 @@ class CsvTest extends DescriptorTestBase {
       "format.fields.1.name" -> "field2",
       "format.fields.1.type" -> "TIMESTAMP",
       "format.fields.2.name" -> "field3",
-      "format.fields.2.type" -> "ANY(java.lang.Class)",
+      "format.fields.2.type" -> "ANY<java.lang.Class>",
       "format.fields.3.name" -> "field4",
-      "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
+      "format.fields.3.type" -> "ROW<test INT, row VARCHAR>",
       "format.line-delimiter" -> "^")
 
     val props2 = Map(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
index ccac317..00e3a21 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.api.Types
+import org.apache.flink.table.runtime.utils.CommonTestData.Person
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -45,6 +47,10 @@ class TableDescriptorTest extends TableTestBase {
     val schema = Schema()
       .field("myfield", Types.STRING)
       .field("myfield2", Types.INT)
+      .field("myfield3", Types.MAP(Types.STRING, Types.INT))
+      .field("myfield4", Types.MULTISET(Types.LONG))
+      .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT))
+      .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
     // CSV table source and sink do not support proctime yet
     //if (isStreaming) {
     //  schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
@@ -56,6 +62,10 @@ class TableDescriptorTest extends TableTestBase {
     val format = Csv()
       .field("myfield", Types.STRING)
       .field("myfield2", Types.INT)
+      .field("myfield3", Types.MAP(Types.STRING, Types.INT))
+      .field("myfield4", Types.MULTISET(Types.LONG))
+      .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT))
+      .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
       .fieldDelimiter("#")
 
     val descriptor: RegistrableDescriptor = if (isStreaming) {
@@ -84,11 +94,29 @@ class TableDescriptorTest extends TableTestBase {
       "format.fields.0.type" -> "VARCHAR",
       "format.fields.1.name" -> "myfield2",
       "format.fields.1.type" -> "INT",
+      "format.fields.2.name" -> "myfield3",
+      "format.fields.2.type" -> "MAP<VARCHAR, INT>",
+      "format.fields.3.name" -> "myfield4",
+      "format.fields.3.type" -> "MULTISET<BIGINT>",
+      "format.fields.4.name" -> "myfield5",
+      "format.fields.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>",
+      "format.fields.5.name" -> "myfield6",
+      "format.fields.5.type" ->
+        "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>",
       "format.field-delimiter" -> "#",
       "schema.0.name" -> "myfield",
       "schema.0.type" -> "VARCHAR",
       "schema.1.name" -> "myfield2",
-      "schema.1.type" -> "INT"
+      "schema.1.type" -> "INT",
+      "schema.2.name" -> "myfield3",
+      "schema.2.type" -> "MAP<VARCHAR, INT>",
+      "schema.3.name" -> "myfield4",
+      "schema.3.type" -> "MULTISET<BIGINT>",
+      "schema.4.name" -> "myfield5",
+      "schema.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>",
+      "schema.5.name" -> "myfield6",
+      "schema.5.type" ->
+        "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>"
     )
 
     val expectedProperties = if (isStreaming) {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
index 9ea8be0..f35b0e0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.typeutils
 
-import java.util
-
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
 import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Assert, Test}
+import org.junit.Test
 
 /**
   * Tests for string-based representation of [[TypeInformation]].
@@ -49,7 +47,7 @@ class TypeStringUtilsTest {
 
     // unsupported type information
     testReadAndWrite(
-      "ANY(java.lang.Void, " +
+      "ANY<java.lang.Void, " +
         "rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl" +
         "ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z" +
         "c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h" +
@@ -59,32 +57,58 @@ class TypeStringUtilsTest {
         "cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA" +
         "AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp" +
         "YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1" +
-        "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)",
+        "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA>",
       BasicTypeInfo.VOID_TYPE_INFO)
   }
 
   @Test
   def testWriteComplexTypes(): Unit = {
     testReadAndWrite(
-      "ROW(f0 DECIMAL, f1 TINYINT)",
+      "ROW<f0 DECIMAL, f1 TINYINT>",
       Types.ROW(Types.DECIMAL, Types.BYTE))
 
     testReadAndWrite(
-      "ROW(hello DECIMAL, world TINYINT)",
+      "ROW<hello DECIMAL, world TINYINT>",
       Types.ROW(
         Array[String]("hello", "world"),
         Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
 
     testReadAndWrite(
-      "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
+      "POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>",
       TypeExtractor.createTypeInfo(classOf[Person]))
 
     testReadAndWrite(
-      "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
+      "ANY<org.apache.flink.table.runtime.utils.CommonTestData$NonPojo>",
       TypeExtractor.createTypeInfo(classOf[NonPojo]))
 
+    testReadAndWrite(
+      "MAP<VARCHAR, ROW<f0 DECIMAL, f1 TINYINT>>",
+      Types.MAP(Types.STRING, Types.ROW(Types.DECIMAL, Types.BYTE))
+    )
+
+    testReadAndWrite(
+      "MULTISET<ROW<f0 DECIMAL, f1 TINYINT>>",
+      Types.MULTISET(Types.ROW(Types.DECIMAL, Types.BYTE))
+    )
+
+    testReadAndWrite(
+      "PRIMITIVE_ARRAY<TINYINT>",
+      Types.PRIMITIVE_ARRAY(Types.BYTE)
+    )
+
+    testReadAndWrite(
+      "OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>",
+      Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))
+    )
+
     // test escaping
     assertTrue(
+      TypeStringUtils.readTypeInfo("ROW<\"he         \\nllo\" DECIMAL, world TINYINT>")
+        .asInstanceOf[RowTypeInfo].getFieldNames
+        .sameElements(Array[String]("he         \nllo", "world")))
+
+    // test backward compatibility with brackets ()
+    assertTrue(
       TypeStringUtils.readTypeInfo("ROW(\"he         \\nllo\" DECIMAL, world TINYINT)")
         .asInstanceOf[RowTypeInfo].getFieldNames
         .sameElements(Array[String]("he         \nllo", "world")))