You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/19 09:12:06 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #12228: [FLINK-17541][table] Support inline structured types

dawidwys commented on a change in pull request #12228:
URL: https://github.com/apache/flink/pull/12228#discussion_r427120645



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
##########
@@ -65,32 +80,55 @@ public static LogicalType removeTimeAttributes(LogicalType logicalType) {
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 				return Long.class;
-			case TIMESTAMP_WITHOUT_TIME_ZONE:
-			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-				return TimestampData.class;
 			case FLOAT:
 				return Float.class;
 			case DOUBLE:
 				return Double.class;
-			case CHAR:
-			case VARCHAR:
-				return StringData.class;
-			case DECIMAL:
-				return DecimalData.class;
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				return TimestampData.class;
+			case TIMESTAMP_WITH_TIME_ZONE:
+				throw new UnsupportedOperationException();
 			case ARRAY:
 				return ArrayData.class;
-			case MAP:
 			case MULTISET:
+			case MAP:
 				return MapData.class;
 			case ROW:
+			case STRUCTURED_TYPE:
 				return RowData.class;
-			case BINARY:
-			case VARBINARY:
-				return byte[].class;
+			case DISTINCT_TYPE:
+				return toInternalConversionClass(((DistinctType) type).getSourceType());
 			case RAW:
 				return RawValueData.class;
+			case NULL:
+			case SYMBOL:
+			case UNRESOLVED:
+			default:
+				throw new IllegalArgumentException();
+		}
+	}
+
+	/**
+	 * Converts the given composite type to a {@link RowType}.
+	 *
+	 * @see LogicalTypeChecks#isCompositeType(LogicalType)
+	 */
+	public static RowType toRowType(LogicalType type) {
+		switch (type.getTypeRoot()) {
+			case ROW:
+				return (RowType) type;
+			case STRUCTURED_TYPE:
+				final StructuredType structuredType = (StructuredType) type;
+				return new RowType(
+					structuredType.isNullable(),
+					structuredType.getAttributes().stream()
+						.map(a -> new RowField(a.getName(), a.getType(), a.getDescription().orElse(null)))
+						.collect(Collectors.toList()));
+			case DISTINCT_TYPE:
+				return toRowType(((DistinctType) type).getSourceType());
 			default:
-				throw new UnsupportedOperationException("Unsupported type: " + type);
+				throw new IllegalArgumentException();

Review comment:
       Let's add the type to the error message.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
##########
@@ -65,32 +80,55 @@ public static LogicalType removeTimeAttributes(LogicalType logicalType) {
 			case BIGINT:
 			case INTERVAL_DAY_TIME:
 				return Long.class;
-			case TIMESTAMP_WITHOUT_TIME_ZONE:
-			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-				return TimestampData.class;
 			case FLOAT:
 				return Float.class;
 			case DOUBLE:
 				return Double.class;
-			case CHAR:
-			case VARCHAR:
-				return StringData.class;
-			case DECIMAL:
-				return DecimalData.class;
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				return TimestampData.class;
+			case TIMESTAMP_WITH_TIME_ZONE:
+				throw new UnsupportedOperationException();
 			case ARRAY:
 				return ArrayData.class;
-			case MAP:
 			case MULTISET:
+			case MAP:
 				return MapData.class;
 			case ROW:
+			case STRUCTURED_TYPE:
 				return RowData.class;
-			case BINARY:
-			case VARBINARY:
-				return byte[].class;
+			case DISTINCT_TYPE:
+				return toInternalConversionClass(((DistinctType) type).getSourceType());
 			case RAW:
 				return RawValueData.class;
+			case NULL:
+			case SYMBOL:
+			case UNRESOLVED:
+			default:
+				throw new IllegalArgumentException();

Review comment:
       Let's add the type to the error message.

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
##########
@@ -161,117 +163,116 @@ object CodeGenUtils {
   // works, but for boxed types we need this:
   // Float a = 1.0f;
   // Byte b = (byte)(float) a;
+  @tailrec
   def primitiveTypeTermForType(t: LogicalType): String = t.getTypeRoot match {
-    case INTEGER => "int"
-    case BIGINT => "long"
-    case SMALLINT => "short"
+    // ordered by type root definition
+    case BOOLEAN => "boolean"
     case TINYINT => "byte"
+    case SMALLINT => "short"
+    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => "int"
+    case BIGINT | INTERVAL_DAY_TIME => "long"
     case FLOAT => "float"
     case DOUBLE => "double"
-    case BOOLEAN => "boolean"
-
-    case DATE => "int"
-    case TIME_WITHOUT_TIME_ZONE => "int"
-    case INTERVAL_YEAR_MONTH => "int"
-    case INTERVAL_DAY_TIME => "long"
-
+    case DISTINCT_TYPE => primitiveTypeTermForType(t.asInstanceOf[DistinctType].getSourceType)
     case _ => boxedTypeTermForType(t)
   }
 
+  @tailrec
   def boxedTypeTermForType(t: LogicalType): String = t.getTypeRoot match {
-    case INTEGER => className[JInt]
-    case BIGINT => className[JLong]
-    case SMALLINT => className[JShort]
+    // ordered by type root definition
+    case CHAR | VARCHAR => BINARY_STRING
+    case BOOLEAN => className[JBoolean]
+    case BINARY | VARBINARY => "byte[]"
+    case DECIMAL => className[DecimalData]
     case TINYINT => className[JByte]
+    case SMALLINT => className[JShort]
+    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => className[JInt]
+    case BIGINT | INTERVAL_DAY_TIME => className[JLong]
     case FLOAT => className[JFloat]
     case DOUBLE => className[JDouble]
-    case BOOLEAN => className[JBoolean]
-
-    case DATE => className[JInt]
-    case TIME_WITHOUT_TIME_ZONE => className[JInt]
-    case INTERVAL_YEAR_MONTH => className[JInt]
-    case INTERVAL_DAY_TIME => className[JLong]
-
-    case VARCHAR | CHAR => BINARY_STRING
-    case VARBINARY | BINARY => "byte[]"
-
-    case DECIMAL => className[DecimalData]
+    case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[TimestampData]
+    case TIMESTAMP_WITH_TIME_ZONE => throw new UnsupportedOperationException
     case ARRAY => className[ArrayData]
     case MULTISET | MAP => className[MapData]
-    case ROW => className[RowData]
+    case ROW | STRUCTURED_TYPE => className[RowData]
     case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[TimestampData]
-
+    case DISTINCT_TYPE => boxedTypeTermForType(t.asInstanceOf[DistinctType].getSourceType)
+    case NULL => className[JObject] // special case for untyped null literals
     case RAW => className[BinaryRawValueData[_]]
-
-    // special case for untyped null literals
-    case NULL => className[JObject]
+    case SYMBOL | UNRESOLVED => throw new IllegalArgumentException()
   }
 
   /**
     * Gets the default value for a primitive type, and null for generic types
     */
+  @tailrec
   def primitiveDefaultValue(t: LogicalType): String = t.getTypeRoot match {
-    case INTEGER | TINYINT | SMALLINT => "-1"
-    case BIGINT => "-1L"
+    // ordered by type root definition
+    case CHAR | VARCHAR => s"$BINARY_STRING.EMPTY_UTF8"
+    case BOOLEAN => "false"
+    case TINYINT | SMALLINT | INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH => "-1"
+    case BIGINT | INTERVAL_DAY_TIME => "-1L"
     case FLOAT => "-1.0f"
     case DOUBLE => "-1.0d"
-    case BOOLEAN => "false"
-    case VARCHAR | CHAR => s"$BINARY_STRING.EMPTY_UTF8"
 
-    case DATE | TIME_WITHOUT_TIME_ZONE => "-1"
-    case INTERVAL_YEAR_MONTH => "-1"
-    case INTERVAL_DAY_TIME => "-1L"
+    case DISTINCT_TYPE => primitiveDefaultValue(t.asInstanceOf[DistinctType].getSourceType)
 
     case _ => "null"
   }
 
-  /**
-    * If it's internally compatible, don't need to DataStructure converter.
-    * clazz != classOf[Row] => Row can only infer GenericType[Row].
-    */
-  def isInternalClass(t: DataType): Boolean = {
-    val clazz = t.getConversionClass
-    clazz != classOf[Object] && clazz != classOf[Row] &&
-        (classOf[RowData].isAssignableFrom(clazz) ||
-            clazz == toInternalConversionClass(fromDataTypeToLogicalType(t)))
-  }
-
+  @tailrec
   def hashCodeForType(
-      ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match {
-    case BOOLEAN => s"${className[JBoolean]}.hashCode($term)"
-    case TINYINT => s"${className[JByte]}.hashCode($term)"
-    case SMALLINT => s"${className[JShort]}.hashCode($term)"
-    case INTEGER => s"${className[JInt]}.hashCode($term)"
-    case BIGINT => s"${className[JLong]}.hashCode($term)"
+      ctx: CodeGeneratorContext,
+      t: LogicalType,
+      term: String)
+    : String = t.getTypeRoot match {
+    // ordered by type root definition
+    case VARCHAR | CHAR =>
+      s"$term.hashCode()"
+    case BOOLEAN =>
+      s"${className[JBoolean]}.hashCode($term)"
+    case BINARY | VARBINARY =>
+      s"${className[MurmurHashUtil]}.hashUnsafeBytes($term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
+    case DECIMAL =>
+      s"$term.hashCode()"
+    case TINYINT =>
+      s"${className[JByte]}.hashCode($term)"
+    case SMALLINT =>
+      s"${className[JShort]}.hashCode($term)"
+    case INTEGER | DATE | TIME_WITHOUT_TIME_ZONE | INTERVAL_YEAR_MONTH =>
+      s"${className[JInt]}.hashCode($term)"
+    case BIGINT | INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
     case FLOAT => s"${className[JFloat]}.hashCode($term)"
     case DOUBLE => s"${className[JDouble]}.hashCode($term)"
-    case VARCHAR | CHAR => s"$term.hashCode()"
-    case VARBINARY | BINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" +
-      s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
-    case DECIMAL => s"$term.hashCode()"
-    case DATE => s"${className[JInt]}.hashCode($term)"
-    case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)"
     case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
       s"$term.hashCode()"
-    case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)"
+    case TIMESTAMP_WITH_TIME_ZONE | ARRAY | MULTISET | MAP =>
+      throw new UnsupportedOperationException(s"Unsupported type for hashing: $t")
     case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
-    case ARRAY => throw new IllegalArgumentException(s"Not support type to hash: $t")
-    case ROW =>
-      val rowType = t.asInstanceOf[RowType]
+    case ROW | STRUCTURED_TYPE =>
+      val fieldCount = getFieldCount(t)
       val subCtx = CodeGeneratorContext(ctx.tableConfig)
       val genHash = HashCodeGenerator.generateRowHash(
-        subCtx, rowType, "SubHashRow", (0 until rowType.getFieldCount).toArray)
+        subCtx, t, "SubHashRow", (0 until fieldCount).toArray)
       ctx.addReusableInnerClass(genHash.getClassName, genHash.getCode)
       val refs = ctx.addReusableObject(subCtx.references.toArray, "subRefs")
       val hashFunc = newName("hashFunc")
       ctx.addReusableMember(s"${classOf[HashFunction].getCanonicalName} $hashFunc;")
       ctx.addReusableInitStatement(s"$hashFunc = new ${genHash.getClassName}($refs);")
       s"$hashFunc.hashCode($term)"
+    case DISTINCT_TYPE =>
+      hashCodeForType(ctx, t.asInstanceOf[DistinctType].getSourceType, term)
     case RAW =>
-      val gt = t.asInstanceOf[TypeInformationRawType[_]]
-      val serTerm = ctx.addReusableObject(
-        gt.getTypeInformation.createSerializer(new ExecutionConfig), "serializer")
+      val serializer = t match {
+        case rt: RawType[_] =>
+          rt.getTypeSerializer
+        case tirt: TypeInformationRawType[_] =>
+          tirt.getTypeInformation.createSerializer(new ExecutionConfig)
+      }
+      val serTerm = ctx.addReusableObject(serializer, "serializer")
       s"$BINARY_RAW_VALUE.getJavaObjectFromRawValueData($term, $serTerm).hashCode()"
+    case NULL | SYMBOL | UNRESOLVED =>
+      throw new IllegalArgumentException()

Review comment:
       ditto

##########
File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
##########
@@ -42,12 +43,24 @@
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link DataTypeUtils}.
  */
 public class DataTypeUtilsTest {
+
+	@Test
+	public void testIsInternalClass() {
+		assertTrue(DataTypeUtils.isInternal(DataTypes.INT()));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.INT().notNull().bridgedTo(int.class)));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.ROW().bridgedTo(RowData.class)));
+		assertTrue(DataTypeUtils.isInternal(DataTypes.ROW().bridgedTo(RowData.class)));

Review comment:
       Duplicated row... Did you mean `StructuredType` by chance?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
##########
@@ -57,9 +59,8 @@ class EqualiserCodeGenerator(fieldTypes: Array[LogicalType]) {
       // TODO merge ScalarOperatorGens.generateEquals.
       val (equalsCode, equalsResult) = if (isInternalPrimitive(fieldType)) {
         ("", s"$leftFieldTerm == $rightFieldTerm")
-      } else if (isRowData(fieldType)) {
-        val equaliserGenerator = new EqualiserCodeGenerator(
-          fieldType.asInstanceOf[RowType].getChildren.asScala.toArray)
+      } else if (isCompositeType(fieldType)) {

Review comment:
       TODO: check it

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
##########
@@ -360,16 +361,21 @@ object AggCodeGenHelper {
 
     aggBufferExprs.zip(initAggBufferExprs).map {
       case (aggBufVar, initExpr) =>
-        val resultCode = aggBufVar.resultType.getTypeRoot match {
-          case VARCHAR | CHAR | ROW | ARRAY | MULTISET | MAP =>
-            val serializer = InternalSerializers.create(
-              aggBufVar.resultType, new ExecutionConfig)
+
+        @tailrec
+        def getResultCode(t: LogicalType): String = t.getTypeRoot match {

Review comment:
       Can we move this method out of the `genInitFlatAggregateBuffer`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org