You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/13 12:42:07 UTC

[GitHub] [flink] beyond1920 commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

beyond1920 commented on code in PR #19709:
URL: https://github.com/apache/flink/pull/19709#discussion_r872348182


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,146 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;

Review Comment:
   Did you forget to call `ctx.reuseLocalVariableCode()` here?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,146 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $isNull = $inputTerm.isNullAt($i);
+            if (!$isNull) {
+              $typeTerm $fieldTerm = ${rowFieldReadAccess(i, inputTerm, elementType)};
+              $hashIntTerm += $elementHashBody;
+            }
+          }
+
+          return $hashIntTerm;
+        }
+
+        @Override
+        public int hashCode($ROW_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for RowData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateMapHash(
+      ctx: CodeGeneratorContext,
+      keyType: LogicalType,
+      valueType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val keyTypeTerm = primitiveTypeTermForType(keyType)
+    val valueTypeTerm = primitiveTypeTermForType(valueType)
+    val keys = newName("keys")
+    val values = newName("values")
+    val keyIsNull = newName("keyIsNull")
+    val keyFieldTerm = newName("keyFieldTerm")
+    val valueIsNull = newName("valueIsNull")
+    val valueFieldTerm = newName("valueFieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate key and value hash code body firstly
+    val keyElementHashBody = hashCodeForType(ctx, keyType, keyFieldTerm)
+    val valueElementHashBody = hashCodeForType(ctx, valueType, valueFieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          $ARRAY_DATA $keys = $inputTerm.keyArray();

Review Comment:
   Did you forget to call `ctx.reuseLocalVariableCode()` here?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala:
##########
@@ -344,6 +353,34 @@ object CodeGenUtils {
 
   // -------------------------- Method & Enum ---------------------------------------
 
+  def genHashForArray(ctx: CodeGeneratorContext, elementType: LogicalType, term: String): String = {
+    val subCtx = CodeGeneratorContext(ctx.tableConfig)
+    val genHash =
+      HashCodeGenerator.generateArrayHash(subCtx, elementType, "SubHashArray")
+    ctx.addReusableInnerClass(genHash.getClassName, genHash.getCode)
+    val refs = ctx.addReusableObject(subCtx.references.toArray, "subRefs")
+    val hashFunc = newName("hashFunc")
+    ctx.addReusableMember(s"${classOf[HashFunction].getCanonicalName} $hashFunc;")
+    ctx.addReusableInitStatement(s"$hashFunc = new ${genHash.getClassName}($refs);")
+    s"$hashFunc.hashCode($term)"
+  }
+
+  def genHashForMap(
+      ctx: CodeGeneratorContext,
+      keyType: LogicalType,
+      valueType: LogicalType,
+      term: String): String = {
+    val subCtx = CodeGeneratorContext(ctx.tableConfig)

Review Comment:
   The code is duplicate with `genHashForArray`, could we avoid this?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -18,18 +18,26 @@
 package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.data.GenericRowData
-import org.apache.flink.table.types.logical.{BigIntType, IntType, RowType, VarBinaryType}
+import org.apache.flink.table.data.{GenericArrayData, GenericMapData, GenericRowData, StringData}
+import org.apache.flink.table.types.logical.{ArrayType, BigIntType, IntType, MapType, MultisetType, RowType, VarBinaryType, VarCharType}
 
-import org.junit.{Assert, Test}
+import org.junit.{Assert, Rule, Test}
+import org.junit.rules.ExpectedException
+
+import scala.collection.JavaConversions.mapAsJavaMap
 
 /** Test for [[HashCodeGenerator]]. */
 class HashCodeGeneratorTest {
 
   private val classLoader = Thread.currentThread().getContextClassLoader
 
+  var expectedException: ExpectedException = ExpectedException.none

Review Comment:
   Would you please add a ITCase in SQL to cover group by array type fields and map type fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org