You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/18 12:55:06 UTC

[GitHub] [flink] twalthr opened a new pull request #12228: [FLINK-17541][table] Support inline structured types

twalthr opened a new pull request #12228:
URL: https://github.com/apache/flink/pull/12228


   ## What is the purpose of the change
   
   This enables inline structured types in the Blink planner. Inline structured types are extracted (e.g. in UDFs) and don't need to be registered in a catalog. This PR finalizes FLIP-65 for scalar and table functions because existing functions can be migrated with a replacement to the new type system.
   
   It also aims to add structured type support to all switch/case statements based on `ROW`. Structured type support should still be declared as experimental until we have more tests and can also deal with structured types in sources and sinks.
   
   ## Brief change log
   
   See commit messages.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows: `FunctionITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844",
       "triggerID" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "203fa4cd14fb09f270ee32344da183bf5113c7ff",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1880",
       "triggerID" : "203fa4cd14fb09f270ee32344da183bf5113c7ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af3f37b2e0ceb50378d5996fc471024c952ebdd0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844) 
   * 203fa4cd14fb09f270ee32344da183bf5113c7ff Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1880) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844",
       "triggerID" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af3f37b2e0ceb50378d5996fc471024c952ebdd0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844",
       "triggerID" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "203fa4cd14fb09f270ee32344da183bf5113c7ff",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1880",
       "triggerID" : "203fa4cd14fb09f270ee32344da183bf5113c7ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 203fa4cd14fb09f270ee32344da183bf5113c7ff Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1880) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
twalthr edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630834686


   @dawidwys I updated the PR. I found another shortcoming that I fixed while addressing your comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844",
       "triggerID" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c672d3d2f0654b3c5d1a18b20d73804e450382a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741) 
   * af3f37b2e0ceb50378d5996fc471024c952ebdd0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12228:
URL: https://github.com/apache/flink/pull/12228#discussion_r427120645



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
##########
@@ -65,32 +80,55 @@ public static LogicalType removeTimeAttributes(LogicalType logicalType) {
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 				return Long.class;
-			case TIMESTAMP_WITHOUT_TIME_ZONE:
-			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-				return TimestampData.class;
 			case FLOAT:
 				return Float.class;
 			case DOUBLE:
 				return Double.class;
-			case CHAR:
-			case VARCHAR:
-				return StringData.class;
-			case DECIMAL:
-				return DecimalData.class;
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				return TimestampData.class;
+			case TIMESTAMP_WITH_TIME_ZONE:
+				throw new UnsupportedOperationException();
 			case ARRAY:
 				return ArrayData.class;
-			case MAP:
 			case MULTISET:
+			case MAP:
 				return MapData.class;
 			case ROW:
+			case STRUCTURED_TYPE:
 				return RowData.class;
-			case BINARY:
-			case VARBINARY:
-				return byte[].class;
+			case DISTINCT_TYPE:
+				return toInternalConversionClass(((DistinctType) type).getSourceType());
 			case RAW:
 				return RawValueData.class;
+			case NULL:
+			case SYMBOL:
+			case UNRESOLVED:
+			default:
+				throw new IllegalArgumentException();
+		}
+	}
+
+	/**
+	 * Converts the given composite type to a {@link RowType}.
+	 *
+	 * @see LogicalTypeChecks#isCompositeType(LogicalType)
+	 */
+	public static RowType toRowType(LogicalType type) {
+		switch (type.getTypeRoot()) {
+			case ROW:
+				return (RowType) type;
+			case STRUCTURED_TYPE:
+				final StructuredType structuredType = (StructuredType) type;
+				return new RowType(
+					structuredType.isNullable(),
+					structuredType.getAttributes().stream()
+						.map(a -> new RowField(a.getName(), a.getType(), a.getDescription().orElse(null)))
+						.collect(Collectors.toList()));
+			case DISTINCT_TYPE:
+				return toRowType(((DistinctType) type).getSourceType());
 			default:
-				throw new UnsupportedOperationException("Unsupported type: " + type);
+				throw new IllegalArgumentException();

Review comment:
       Let's add the type to the error message.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
##########
@@ -65,32 +80,55 @@ public static LogicalType removeTimeAttributes(LogicalType logicalType) {
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 				return Long.class;
-			case TIMESTAMP_WITHOUT_TIME_ZONE:
-			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-				return TimestampData.class;
 			case FLOAT:
 				return Float.class;
 			case DOUBLE:
 				return Double.class;
-			case CHAR:
-			case VARCHAR:
-				return StringData.class;
-			case DECIMAL:
-				return DecimalData.class;
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				return TimestampData.class;
+			case TIMESTAMP_WITH_TIME_ZONE:
+				throw new UnsupportedOperationException();
 			case ARRAY:
 				return ArrayData.class;
-			case MAP:
 			case MULTISET:
+			case MAP:
 				return MapData.class;
 			case ROW:
+			case STRUCTURED_TYPE:
 				return RowData.class;
-			case BINARY:
-			case VARBINARY:
-				return byte[].class;
+			case DISTINCT_TYPE:
+				return toInternalConversionClass(((DistinctType) type).getSourceType());
 			case RAW:
 				return RawValueData.class;
+			case NULL:
+			case SYMBOL:
+			case UNRESOLVED:
+			default:
+				throw new IllegalArgumentException();

Review comment:
       Let's add the type to the error message.

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
##########
@@ -161,117 +163,116 @@ object CodeGenUtils {
   // works, but for boxed types we need this:
   // Float a = 1.0f;
   // Byte b = (byte)(float) a;
+  @tailrec
   def primitiveTypeTermForType(t: LogicalType): String = t.getTypeRoot match {
-    case INTEGER => "int"
-    case BIGINT => "long"
-    case SMALLINT => "short"
+    // ordered by type root definition
+    case BOOLEAN => "boolean"
     case TINYINT => "byte"
+    case SMALLINT => "short"
+    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => "int"
+    case BIGINT | INTERVAL_DAY_TIME => "long"
     case FLOAT => "float"
     case DOUBLE => "double"
-    case BOOLEAN => "boolean"
-
-    case DATE => "int"
-    case TIME_WITHOUT_TIME_ZONE => "int"
-    case INTERVAL_YEAR_MONTH => "int"
-    case INTERVAL_DAY_TIME => "long"
-
+    case DISTINCT_TYPE => primitiveTypeTermForType(t.asInstanceOf[DistinctType].getSourceType)
     case _ => boxedTypeTermForType(t)
   }
 
+  @tailrec
   def boxedTypeTermForType(t: LogicalType): String = t.getTypeRoot match {
-    case INTEGER => className[JInt]
-    case BIGINT => className[JLong]
-    case SMALLINT => className[JShort]
+    // ordered by type root definition
+    case CHAR | VARCHAR => BINARY_STRING
+    case BOOLEAN => className[JBoolean]
+    case BINARY | VARBINARY => "byte[]"
+    case DECIMAL => className[DecimalData]
     case TINYINT => className[JByte]
+    case SMALLINT => className[JShort]
+    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => className[JInt]
+    case BIGINT | INTERVAL_DAY_TIME => className[JLong]
     case FLOAT => className[JFloat]
     case DOUBLE => className[JDouble]
-    case BOOLEAN => className[JBoolean]
-
-    case DATE => className[JInt]
-    case TIME_WITHOUT_TIME_ZONE => className[JInt]
-    case INTERVAL_YEAR_MONTH => className[JInt]
-    case INTERVAL_DAY_TIME => className[JLong]
-
-    case VARCHAR | CHAR => BINARY_STRING
-    case VARBINARY | BINARY => "byte[]"
-
-    case DECIMAL => className[DecimalData]
+    case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[TimestampData]
+    case TIMESTAMP_WITH_TIME_ZONE => throw new UnsupportedOperationException
     case ARRAY => className[ArrayData]
     case MULTISET | MAP => className[MapData]
-    case ROW => className[RowData]
+    case ROW | STRUCTURED_TYPE => className[RowData]
     case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[TimestampData]
-
+    case DISTINCT_TYPE => boxedTypeTermForType(t.asInstanceOf[DistinctType].getSourceType)
+    case NULL => className[JObject] // special case for untyped null literals
     case RAW => className[BinaryRawValueData[_]]
-
-    // special case for untyped null literals
-    case NULL => className[JObject]
+    case SYMBOL | UNRESOLVED => throw new IllegalArgumentException()
   }
 
   /**
     * Gets the default value for a primitive type, and null for generic types
     */
+  @tailrec
   def primitiveDefaultValue(t: LogicalType): String = t.getTypeRoot match {
-    case INTEGER | TINYINT | SMALLINT => "-1"
-    case BIGINT => "-1L"
+    // ordered by type root definition
+    case CHAR | VARCHAR => s"$BINARY_STRING.EMPTY_UTF8"
+    case BOOLEAN => "false"
+    case TINYINT | SMALLINT | INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => "-1"
+    case BIGINT | INTERVAL_DAY_TIME => "-1L"
     case FLOAT => "-1.0f"
     case DOUBLE => "-1.0d"
-    case BOOLEAN => "false"
-    case VARCHAR | CHAR => s"$BINARY_STRING.EMPTY_UTF8"
 
-    case DATE | TIME_WITHOUT_TIME_ZONE => "-1"
-    case INTERVAL_YEAR_MONTH => "-1"
-    case INTERVAL_DAY_TIME => "-1L"
+    case DISTINCT_TYPE => primitiveDefaultValue(t.asInstanceOf[DistinctType].getSourceType)
 
     case _ => "null"
   }
 
-  /**
-    * If it's internally compatible, don't need to DataStructure converter.
-    * clazz != classOf[Row] => Row can only infer GenericType[Row].
-    */
-  def isInternalClass(t: DataType): Boolean = {
-    val clazz = t.getConversionClass
-    clazz != classOf[Object] && clazz != classOf[Row] &&
-        (classOf[RowData].isAssignableFrom(clazz) ||
-            clazz == toInternalConversionClass(fromDataTypeToLogicalType(t)))
-  }
-
+  @tailrec
   def hashCodeForType(
-      ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match {
-    case BOOLEAN => s"${className[JBoolean]}.hashCode($term)"
-    case TINYINT => s"${className[JByte]}.hashCode($term)"
-    case SMALLINT => s"${className[JShort]}.hashCode($term)"
-    case INTEGER => s"${className[JInt]}.hashCode($term)"
-    case BIGINT => s"${className[JLong]}.hashCode($term)"
+      ctx: CodeGeneratorContext,
+      t: LogicalType,
+      term: String)
+    : String = t.getTypeRoot match {
+    // ordered by type root definition
+    case VARCHAR | CHAR =>
+      s"$term.hashCode()"
+    case BOOLEAN =>
+      s"${className[JBoolean]}.hashCode($term)"
+    case BINARY | VARBINARY =>
+      s"${className[MurmurHashUtil]}.hashUnsafeBytes($term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
+    case DECIMAL =>
+      s"$term.hashCode()"
+    case TINYINT =>
+      s"${className[JByte]}.hashCode($term)"
+    case SMALLINT =>
+      s"${className[JShort]}.hashCode($term)"
+    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH =>
+      s"${className[JInt]}.hashCode($term)"
+    case BIGINT | INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
     case FLOAT => s"${className[JFloat]}.hashCode($term)"
     case DOUBLE => s"${className[JDouble]}.hashCode($term)"
-    case VARCHAR | CHAR => s"$term.hashCode()"
-    case VARBINARY | BINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" +
-      s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
-    case DECIMAL => s"$term.hashCode()"
-    case DATE => s"${className[JInt]}.hashCode($term)"
-    case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)"
     case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
       s"$term.hashCode()"
-    case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)"
+    case TIMESTAMP_WITH_TIME_ZONE | ARRAY | MULTISET | MAP =>
+      throw new UnsupportedOperationException(s"Unsupported type for hashing: $t")
     case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
-    case ARRAY => throw new IllegalArgumentException(s"Not support type to hash: $t")
-    case ROW =>
-      val rowType = t.asInstanceOf[RowType]
+    case ROW | STRUCTURED_TYPE =>
+      val fieldCount = getFieldCount(t)
       val subCtx = CodeGeneratorContext(ctx.tableConfig)
       val genHash = HashCodeGenerator.generateRowHash(
-        subCtx, rowType, "SubHashRow", (0 until rowType.getFieldCount).toArray)
+        subCtx, t, "SubHashRow", (0 until fieldCount).toArray)
       ctx.addReusableInnerClass(genHash.getClassName, genHash.getCode)
       val refs = ctx.addReusableObject(subCtx.references.toArray, "subRefs")
       val hashFunc = newName("hashFunc")
       ctx.addReusableMember(s"${classOf[HashFunction].getCanonicalName} $hashFunc;")
       ctx.addReusableInitStatement(s"$hashFunc = new ${genHash.getClassName}($refs);")
       s"$hashFunc.hashCode($term)"
+    case DISTINCT_TYPE =>
+      hashCodeForType(ctx, t.asInstanceOf[DistinctType].getSourceType, term)
     case RAW =>
-      val gt = t.asInstanceOf[TypeInformationRawType[_]]
-      val serTerm = ctx.addReusableObject(
-        gt.getTypeInformation.createSerializer(new ExecutionConfig), "serializer")
+      val serializer = t match {
+        case rt: RawType[_] =>
+          rt.getTypeSerializer
+        case tirt: TypeInformationRawType[_] =>
+          tirt.getTypeInformation.createSerializer(new ExecutionConfig)
+      }
+      val serTerm = ctx.addReusableObject(serializer, "serializer")
       s"$BINARY_RAW_VALUE.getJavaObjectFromRawValueData($term, $serTerm).hashCode()"
+    case NULL | SYMBOL | UNRESOLVED =>
+      throw new IllegalArgumentException()

Review comment:
       ditto

##########
File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
##########
@@ -42,12 +43,24 @@
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link DataTypeUtils}.
  */
 public class DataTypeUtilsTest {
+
+	@Test
+	public void testIsInternalClass() {
+		assertTrue(DataTypeUtils.isInternal(DataTypes.INT()));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.INT().notNull().bridgedTo(int.class)));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.ROW().bridgedTo(RowData.class)));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.ROW().bridgedTo(RowData.class)));

Review comment:
       Duplicated row... Did you mean `StructuredType` by chance?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
##########
@@ -57,9 +59,8 @@ class EqualiserCodeGenerator(fieldTypes: Array[LogicalType]) {
       // TODO merge ScalarOperatorGens.generateEquals.
       val (equalsCode, equalsResult) = if (isInternalPrimitive(fieldType)) {
         ("", s"$leftFieldTerm == $rightFieldTerm")
-      } else if (isRowData(fieldType)) {
-        val equaliserGenerator = new EqualiserCodeGenerator(
-          fieldType.asInstanceOf[RowType].getChildren.asScala.toArray)
+      } else if (isCompositeType(fieldType)) {

Review comment:
       TODO: check it

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
##########
@@ -360,16 +361,21 @@ object AggCodeGenHelper {
 
     aggBufferExprs.zip(initAggBufferExprs).map {
       case (aggBufVar, initExpr) =>
-        val resultCode = aggBufVar.resultType.getTypeRoot match {
-          case VARCHAR | CHAR | ROW | ARRAY | MULTISET | MAP =>
-            val serializer = InternalSerializers.create(
-              aggBufVar.resultType, new ExecutionConfig)
+
+        @tailrec
+        def getResultCode(t: LogicalType): String = t.getTypeRoot match {

Review comment:
       Can we move this method out of the `genInitFlatAggregateBuffer`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12228:
URL: https://github.com/apache/flink/pull/12228#discussion_r427268991



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
##########
@@ -57,9 +59,8 @@ class EqualiserCodeGenerator(fieldTypes: Array[LogicalType]) {
       // TODO merge ScalarOperatorGens.generateEquals.
       val (equalsCode, equalsResult) = if (isInternalPrimitive(fieldType)) {
         ("", s"$leftFieldTerm == $rightFieldTerm")
-      } else if (isRowData(fieldType)) {
-        val equaliserGenerator = new EqualiserCodeGenerator(
-          fieldType.asInstanceOf[RowType].getChildren.asScala.toArray)
+      } else if (isCompositeType(fieldType)) {

Review comment:
       sorry that was a comment for myself :sweat_smile:  forgot to remove it...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c672d3d2f0654b3c5d1a18b20d73804e450382a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630164761


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 203fa4cd14fb09f270ee32344da183bf5113c7ff (Fri Oct 16 10:51:27 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c672d3d2f0654b3c5d1a18b20d73804e450382a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #12228:
URL: https://github.com/apache/flink/pull/12228


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c672d3d2f0654b3c5d1a18b20d73804e450382a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741) 
   * af3f37b2e0ceb50378d5996fc471024c952ebdd0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630164761


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 5c672d3d2f0654b3c5d1a18b20d73804e450382a (Mon May 18 12:56:50 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12228:
URL: https://github.com/apache/flink/pull/12228#discussion_r427245994



##########
File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
##########
@@ -42,12 +43,24 @@
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link DataTypeUtils}.
  */
 public class DataTypeUtilsTest {
+
+	@Test
+	public void testIsInternalClass() {
+		assertTrue(DataTypeUtils.isInternal(DataTypes.INT()));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.INT().notNull().bridgedTo(int.class)));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.ROW().bridgedTo(RowData.class)));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.ROW().bridgedTo(RowData.class)));

Review comment:
       I wanted to test a regular `Row`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c672d3d2f0654b3c5d1a18b20d73804e450382a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
twalthr commented on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630834686


   @dawidwys I updated the PR. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12228: [FLINK-17541][table] Support inline structured types

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12228:
URL: https://github.com/apache/flink/pull/12228#issuecomment-630171014


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1741",
       "triggerID" : "5c672d3d2f0654b3c5d1a18b20d73804e450382a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844",
       "triggerID" : "af3f37b2e0ceb50378d5996fc471024c952ebdd0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "203fa4cd14fb09f270ee32344da183bf5113c7ff",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "203fa4cd14fb09f270ee32344da183bf5113c7ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af3f37b2e0ceb50378d5996fc471024c952ebdd0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1844) 
   * 203fa4cd14fb09f270ee32344da183bf5113c7ff UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org