You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/12 12:53:21 UTC

[GitHub] [flink] lsyldliu opened a new pull request, #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

lsyldliu opened a new pull request, #19709:
URL: https://github.com/apache/flink/pull/19709

   ## What is the purpose of the change
   Support complex shuffle hash key type
   
   ## Brief change log
   
     - *Support complex shuffle hash key type*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added unit tests in HashCodeGeneratorTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): ( no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: ( no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong merged pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
wuchong merged PR #19709:
URL: https://github.com/apache/flink/pull/19709


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] beyond1920 commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on code in PR #19709:
URL: https://github.com/apache/flink/pull/19709#discussion_r872348182


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,146 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;

Review Comment:
   Did you forget to call `ctx.reuseLocalVariableCode()` here?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,146 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $isNull = $inputTerm.isNullAt($i);
+            if (!$isNull) {
+              $typeTerm $fieldTerm = ${rowFieldReadAccess(i, inputTerm, elementType)};
+              $hashIntTerm += $elementHashBody;
+            }
+          }
+
+          return $hashIntTerm;
+        }
+
+        @Override
+        public int hashCode($ROW_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for RowData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateMapHash(
+      ctx: CodeGeneratorContext,
+      keyType: LogicalType,
+      valueType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val keyTypeTerm = primitiveTypeTermForType(keyType)
+    val valueTypeTerm = primitiveTypeTermForType(valueType)
+    val keys = newName("keys")
+    val values = newName("values")
+    val keyIsNull = newName("keyIsNull")
+    val keyFieldTerm = newName("keyFieldTerm")
+    val valueIsNull = newName("valueIsNull")
+    val valueFieldTerm = newName("valueFieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate key and value hash code body firstly
+    val keyElementHashBody = hashCodeForType(ctx, keyType, keyFieldTerm)
+    val valueElementHashBody = hashCodeForType(ctx, valueType, valueFieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          $ARRAY_DATA $keys = $inputTerm.keyArray();

Review Comment:
   Did you forget to call `ctx.reuseLocalVariableCode()` here?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala:
##########
@@ -344,6 +353,34 @@ object CodeGenUtils {
 
   // -------------------------- Method & Enum ---------------------------------------
 
+  def genHashForArray(ctx: CodeGeneratorContext, elementType: LogicalType, term: String): String = {
+    val subCtx = CodeGeneratorContext(ctx.tableConfig)
+    val genHash =
+      HashCodeGenerator.generateArrayHash(subCtx, elementType, "SubHashArray")
+    ctx.addReusableInnerClass(genHash.getClassName, genHash.getCode)
+    val refs = ctx.addReusableObject(subCtx.references.toArray, "subRefs")
+    val hashFunc = newName("hashFunc")
+    ctx.addReusableMember(s"${classOf[HashFunction].getCanonicalName} $hashFunc;")
+    ctx.addReusableInitStatement(s"$hashFunc = new ${genHash.getClassName}($refs);")
+    s"$hashFunc.hashCode($term)"
+  }
+
+  def genHashForMap(
+      ctx: CodeGeneratorContext,
+      keyType: LogicalType,
+      valueType: LogicalType,
+      term: String): String = {
+    val subCtx = CodeGeneratorContext(ctx.tableConfig)

Review Comment:
   The code is duplicate with `genHashForArray`, could we avoid this?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -18,18 +18,26 @@
 package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.data.GenericRowData
-import org.apache.flink.table.types.logical.{BigIntType, IntType, RowType, VarBinaryType}
+import org.apache.flink.table.data.{GenericArrayData, GenericMapData, GenericRowData, StringData}
+import org.apache.flink.table.types.logical.{ArrayType, BigIntType, IntType, MapType, MultisetType, RowType, VarBinaryType, VarCharType}
 
-import org.junit.{Assert, Test}
+import org.junit.{Assert, Rule, Test}
+import org.junit.rules.ExpectedException
+
+import scala.collection.JavaConversions.mapAsJavaMap
 
 /** Test for [[HashCodeGenerator]]. */
 class HashCodeGeneratorTest {
 
   private val classLoader = Thread.currentThread().getContextClassLoader
 
+  var expectedException: ExpectedException = ExpectedException.none

Review Comment:
   Would you please add a ITCase in SQL to cover group by array type fields and map type fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #19709:
URL: https://github.com/apache/flink/pull/19709#issuecomment-1173089251

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #19709:
URL: https://github.com/apache/flink/pull/19709#issuecomment-1127635151

   @beyond1920 Thanks for your review, I have updated the pr according to your feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on code in PR #19709:
URL: https://github.com/apache/flink/pull/19709#discussion_r909604325


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -51,6 +59,145 @@ class HashCodeGeneratorTest {
     val row = GenericRowData.of(ji(5), jl(8), Array[Byte](1, 5, 6))
     Assert.assertEquals(637, hashFunc1.hashCode(row))
     Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+
+    // test row with nested array and map type
+    val hashFunc3 = HashCodeGenerator
+      .generateRowHash(
+        new CodeGeneratorContext(new Configuration),
+        RowType.of(
+          new IntType(),
+          new ArrayType(new IntType()),
+          new MultisetType(new IntType()),
+          new MapType(new IntType(), new VarCharType())),
+        "name",
+        Array(1, 2, 0, 3)
+      )
+      .newInstance(classLoader)
+
+    val row3 = GenericRowData.of(
+      ji(5),
+      new GenericArrayData(Array(1, 5, 7)),
+      new GenericMapData(Map(1 -> null, 5 -> null, 10 -> null)),
+      new GenericMapData(
+        Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    )
+    Assert.assertEquals(368915957, hashFunc3.hashCode(row3))
+
+    // test hash code for ArrayData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericArrayData(Array(1)))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericMapData(null))
+  }
+
+  @Test
+  def testArrayHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateArrayHash(new CodeGeneratorContext(new Configuration()), new IntType(), "name")
+      .newInstance(classLoader)
+
+    val array1 = new GenericArrayData(Array(1, 5, 7))
+    Assert.assertEquals(13, hashFunc1.hashCode(array1))
+
+    // test complex map type of element
+    val hashFunc2 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration()),
+        new MapType(new IntType(), new VarCharType()),
+        "name")
+      .newInstance(classLoader)
+
+    val mapData = new GenericMapData(
+      Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    val array2 = new GenericArrayData(Array[AnyRef](mapData))
+    Assert.assertEquals(357483069, hashFunc2.hashCode(array2))
+
+    // test complex row type of element
+    val hashFunc3 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration()),
+        RowType.of(new IntType(), new BigIntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val array3 = new GenericArrayData(
+      Array[AnyRef](GenericRowData.of(ji(5), jl(8)), GenericRowData.of(ji(25), jl(52))))
+    Assert.assertEquals(2430, hashFunc3.hashCode(array3))
+
+    // test hash code for RowData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "ArrayData hash function doesn't support to generate hash code for RowData.")
+    hashFunc3.hashCode(GenericRowData.of(null))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "ArrayData hash function doesn't support to generate hash code for RowData.")

Review Comment:
   For the same reasons as above



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -51,6 +59,145 @@ class HashCodeGeneratorTest {
     val row = GenericRowData.of(ji(5), jl(8), Array[Byte](1, 5, 6))
     Assert.assertEquals(637, hashFunc1.hashCode(row))
     Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+
+    // test row with nested array and map type
+    val hashFunc3 = HashCodeGenerator
+      .generateRowHash(
+        new CodeGeneratorContext(new Configuration),
+        RowType.of(
+          new IntType(),
+          new ArrayType(new IntType()),
+          new MultisetType(new IntType()),
+          new MapType(new IntType(), new VarCharType())),
+        "name",
+        Array(1, 2, 0, 3)
+      )
+      .newInstance(classLoader)
+
+    val row3 = GenericRowData.of(
+      ji(5),
+      new GenericArrayData(Array(1, 5, 7)),
+      new GenericMapData(Map(1 -> null, 5 -> null, 10 -> null)),
+      new GenericMapData(
+        Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    )
+    Assert.assertEquals(368915957, hashFunc3.hashCode(row3))
+
+    // test hash code for ArrayData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericArrayData(Array(1)))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericMapData(null))
+  }
+
+  @Test
+  def testArrayHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateArrayHash(new CodeGeneratorContext(new Configuration()), new IntType(), "name")
+      .newInstance(classLoader)
+
+    val array1 = new GenericArrayData(Array(1, 5, 7))
+    Assert.assertEquals(13, hashFunc1.hashCode(array1))
+
+    // test complex map type of element
+    val hashFunc2 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration()),
+        new MapType(new IntType(), new VarCharType()),
+        "name")
+      .newInstance(classLoader)
+
+    val mapData = new GenericMapData(
+      Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    val array2 = new GenericArrayData(Array[AnyRef](mapData))
+    Assert.assertEquals(357483069, hashFunc2.hashCode(array2))
+
+    // test complex row type of element
+    val hashFunc3 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration()),
+        RowType.of(new IntType(), new BigIntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val array3 = new GenericArrayData(
+      Array[AnyRef](GenericRowData.of(ji(5), jl(8)), GenericRowData.of(ji(25), jl(52))))
+    Assert.assertEquals(2430, hashFunc3.hashCode(array3))
+
+    // test hash code for RowData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "ArrayData hash function doesn't support to generate hash code for RowData.")
+    hashFunc3.hashCode(GenericRowData.of(null))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "ArrayData hash function doesn't support to generate hash code for RowData.")
+    hashFunc3.hashCode(new GenericMapData(null))
+  }
+
+  @Test
+  def testMapHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateMapHash(
+        new CodeGeneratorContext(new Configuration()),
+        new IntType(),
+        new VarCharType(),
+        "name")
+      .newInstance(classLoader)
+
+    val map1 = new GenericMapData(
+      Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    Assert.assertEquals(357483069, hashFunc1.hashCode(map1))
+
+    // test complex row type of value
+    val hashFunc2 = HashCodeGenerator
+      .generateMapHash(
+        new CodeGeneratorContext(new Configuration()),
+        new IntType(),
+        RowType.of(new IntType(), new BigIntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val map2 = new GenericMapData(
+      Map(1 -> GenericRowData.of(ji(5), jl(8)), 5 -> GenericRowData.of(ji(54), jl(78)), 10 -> null))
+    Assert.assertEquals(4763, hashFunc2.hashCode(map2))
+
+    // test complex array type of value
+    val hashFunc3 = HashCodeGenerator
+      .generateMapHash(
+        new CodeGeneratorContext(new Configuration()),
+        new IntType(),
+        new ArrayType(new IntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val map3 = new GenericMapData(
+      Map(
+        1 -> new GenericArrayData(Array(1, 5, 7)),
+        5 -> new GenericArrayData(Array(2, 4, 8)),
+        10 -> null))
+    Assert.assertEquals(43, hashFunc3.hashCode(map3))
+
+    // test hash code for RowData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage("MapData hash function doesn't support to generate hash code for RowData.")
+    hashFunc3.hashCode(GenericRowData.of(null))
+
+    // test hash code for ArrayData
+    thrown.expect(classOf[RuntimeException])

Review Comment:
   The code here will not be executed



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,146 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $isNull = $inputTerm.isNullAt($i);
+            if (!$isNull) {
+              $typeTerm $fieldTerm = ${rowFieldReadAccess(i, inputTerm, elementType)};
+              $hashIntTerm += $elementHashBody;
+            }
+          }
+
+          return $hashIntTerm;
+        }
+
+        @Override
+        public int hashCode($ROW_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for RowData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateMapHash(
+      ctx: CodeGeneratorContext,
+      keyType: LogicalType,
+      valueType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val keyTypeTerm = primitiveTypeTermForType(keyType)
+    val valueTypeTerm = primitiveTypeTermForType(valueType)
+    val keys = newName("keys")
+    val values = newName("values")
+    val keyIsNull = newName("keyIsNull")
+    val keyFieldTerm = newName("keyFieldTerm")
+    val valueIsNull = newName("valueIsNull")
+    val valueFieldTerm = newName("valueFieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate key and value hash code body firstly
+    val keyElementHashBody = hashCodeForType(ctx, keyType, keyFieldTerm)
+    val valueElementHashBody = hashCodeForType(ctx, valueType, valueFieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          $ARRAY_DATA $keys = $inputTerm.keyArray();
+          $ARRAY_DATA $values = $inputTerm.valueArray();
+
+          int $hashIntTerm = 0;
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $keyIsNull = $keys.isNullAt($i);
+            if (!$keyIsNull) {
+              $keyTypeTerm $keyFieldTerm = ${rowFieldReadAccess(i, keys, keyType)};
+              $hashIntTerm += $keyElementHashBody;
+            }
+
+            boolean $valueIsNull = $values.isNullAt($i);
+            if(!$valueIsNull) {
+              $valueTypeTerm $valueFieldTerm = ${rowFieldReadAccess(i, values, valueType)};
+              $hashIntTerm += $valueElementHashBody;
+            }
+          }
+
+          return $hashIntTerm;
+        }
+
+        @Override
+        public int hashCode($ROW_DATA $inputTerm) {
+          ${genThrowException("MapData hash function doesn't support to generate hash code for RowData.")}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for ArrayData.")}

Review Comment:
   It should be 
   >`MapData` hash function doesn't support to generate hash code for ArrayData. 
   not 
   >`ArrayData` hash function doesn't support to generate hash code for ArrayData.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -51,6 +59,145 @@ class HashCodeGeneratorTest {
     val row = GenericRowData.of(ji(5), jl(8), Array[Byte](1, 5, 6))
     Assert.assertEquals(637, hashFunc1.hashCode(row))
     Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+
+    // test row with nested array and map type
+    val hashFunc3 = HashCodeGenerator
+      .generateRowHash(
+        new CodeGeneratorContext(new Configuration),
+        RowType.of(
+          new IntType(),
+          new ArrayType(new IntType()),
+          new MultisetType(new IntType()),
+          new MapType(new IntType(), new VarCharType())),
+        "name",
+        Array(1, 2, 0, 3)
+      )
+      .newInstance(classLoader)
+
+    val row3 = GenericRowData.of(
+      ji(5),
+      new GenericArrayData(Array(1, 5, 7)),
+      new GenericMapData(Map(1 -> null, 5 -> null, 10 -> null)),
+      new GenericMapData(
+        Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    )
+    Assert.assertEquals(368915957, hashFunc3.hashCode(row3))
+
+    // test hash code for ArrayData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericArrayData(Array(1)))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")

Review Comment:
   It should be
   > RowData hash function doesn't support to generate hash code for `MapData`.
   not
   >RowData hash function doesn't support to generate hash code for `ArrayData`.
   
   However, since an exception occurred in the previous step when testing ArrayData, the code here was not executed, so the exception was not tested
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #19709:
URL: https://github.com/apache/flink/pull/19709#issuecomment-1206372061

   @wuchong Thanks for review, I've addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19709:
URL: https://github.com/apache/flink/pull/19709#discussion_r873677386


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -18,18 +18,26 @@
 package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.data.GenericRowData
-import org.apache.flink.table.types.logical.{BigIntType, IntType, RowType, VarBinaryType}
+import org.apache.flink.table.data.{GenericArrayData, GenericMapData, GenericRowData, StringData}
+import org.apache.flink.table.types.logical.{ArrayType, BigIntType, IntType, MapType, MultisetType, RowType, VarBinaryType, VarCharType}
 
-import org.junit.{Assert, Test}
+import org.junit.{Assert, Rule, Test}
+import org.junit.rules.ExpectedException
+
+import scala.collection.JavaConversions.mapAsJavaMap
 
 /** Test for [[HashCodeGenerator]]. */
 class HashCodeGeneratorTest {
 
   private val classLoader = Thread.currentThread().getContextClassLoader
 
+  var expectedException: ExpectedException = ExpectedException.none

Review Comment:
   I have add an ITCase about array type in group by clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19709:
URL: https://github.com/apache/flink/pull/19709#discussion_r873676500


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,146 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;

Review Comment:
   Here all the local variables are declared explicitly, so it is no need to call ctx.reuseLocalVariableCode() .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
wuchong commented on PR #19709:
URL: https://github.com/apache/flink/pull/19709#issuecomment-1206681944

   The failed case is tracked by FLINK-26721


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #19709:
URL: https://github.com/apache/flink/pull/19709#discussion_r934288651


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala:
##########
@@ -311,22 +311,35 @@ object CodeGenUtils {
       case DOUBLE => s"${className[JDouble]}.hashCode($term)"
       case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
         s"$term.hashCode()"
-      case TIMESTAMP_WITH_TIME_ZONE | ARRAY | MULTISET | MAP =>
+      case TIMESTAMP_WITH_TIME_ZONE =>
         throw new UnsupportedOperationException(
           s"Unsupported type($t) to generate hash code," +
             s" the type($t) is not supported as a GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field.")
+      case ARRAY =>
+        val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader)
+        val genHash =
+          HashCodeGenerator.generateArrayHash(
+            subCtx,
+            t.asInstanceOf[ArrayType].getElementType,
+            "SubHashArray")
+        genHashFunction(ctx, subCtx, genHash, term)
+      case MULTISET | MAP =>
+        val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader)
+        val (keyType, valueType) = if (t.isInstanceOf[MultisetType]) {
+          (t.asInstanceOf[MultisetType].getElementType, new IntType())
+        } else {
+          (t.asInstanceOf[MapType].getKeyType, t.asInstanceOf[MapType].getValueType)
+        }

Review Comment:
   Can be replaced with Scala pattern match:
   
   ```scala
           val (keyType, valueType) = t match {
             case multiset: MultisetType =>
               (multiset.getElementType, new IntType())
             case map: MapType =>
               (map.getKeyType, map.getValueType)
           }
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,157 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;
+          // This is inspired by hive & presto
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $isNull = $inputTerm.isNullAt($i);
+            if (!$isNull) {
+              $typeTerm $fieldTerm = ${rowFieldReadAccess(i, inputTerm, elementType)};
+              $hashIntTerm = 31 * $hashIntTerm + $elementHashBody;
+            }
+          }
+
+          return $hashIntTerm;
+        }
+
+        @Override
+        public int hashCode($ROW_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for RowData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateMapHash(
+      ctx: CodeGeneratorContext,
+      keyType: LogicalType,
+      valueType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val keyTypeTerm = primitiveTypeTermForType(keyType)
+    val valueTypeTerm = primitiveTypeTermForType(valueType)
+    val keys = newName("keys")
+    val values = newName("values")
+    val keyIsNull = newName("keyIsNull")
+    val keyFieldTerm = newName("keyFieldTerm")
+    val valueIsNull = newName("valueIsNull")
+    val valueFieldTerm = newName("valueFieldTerm")
+    val keyHashTerm = newName("keyHashCode")
+    val valueHashTerm = newName("valueHashCode")
+    val hashIntTerm = newName("hashCode")
+    val i = newName("i")
+
+    // Generate key and value hash code body firstly
+    val keyElementHashBody = hashCodeForType(ctx, keyType, keyFieldTerm)
+    val valueElementHashBody = hashCodeForType(ctx, valueType, valueFieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          $ARRAY_DATA $keys = $inputTerm.keyArray();
+          $ARRAY_DATA $values = $inputTerm.valueArray();
+
+          int $keyHashTerm = 0;
+          int $valueHashTerm = 0;
+          int $hashIntTerm = 0;
+          
+          // This is inspired by hive & presto
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $keyIsNull = $keys.isNullAt($i);
+            if (!$keyIsNull) {
+              $keyHashTerm = 0;
+              $keyTypeTerm $keyFieldTerm = ${rowFieldReadAccess(i, keys, keyType)};
+              $keyHashTerm = $keyElementHashBody;
+            }
+
+            boolean $valueIsNull = $values.isNullAt($i);
+            if(!$valueIsNull) {
+              $valueHashTerm = 0;

Review Comment:
   Should the `$keyHashTerm = 0;` and `$valueHashTerm = 0;` be executed before if condition? Otherwise, if current value is null, the `valueHashTerm` would be the hash of the last entry. Could you add a test for a map with multiple null values?
   



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,157 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;

Review Comment:
   Should the `int $hashIntTerm = 0;` start from `1`?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,157 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;
+          // This is inspired by hive & presto
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $isNull = $inputTerm.isNullAt($i);
+            if (!$isNull) {
+              $typeTerm $fieldTerm = ${rowFieldReadAccess(i, inputTerm, elementType)};
+              $hashIntTerm = 31 * $hashIntTerm + $elementHashBody;
+            }
+          }
+
+          return $hashIntTerm;
+        }
+
+        @Override
+        public int hashCode($ROW_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for RowData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for MapData.")}
+        }

Review Comment:
   Do not exceed 100 characters in a single line in Scala. 



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,157 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;
+          // This is inspired by hive & presto
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $isNull = $inputTerm.isNullAt($i);
+            if (!$isNull) {
+              $typeTerm $fieldTerm = ${rowFieldReadAccess(i, inputTerm, elementType)};
+              $hashIntTerm = 31 * $hashIntTerm + $elementHashBody;
+            }

Review Comment:
   Should we consider null elements, e.g. use hash 0 for null elements instead of skipping them?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/HashFunction.java:
##########
@@ -18,12 +18,23 @@
 
 package org.apache.flink.table.runtime.generated;
 
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 
 /**
- * Interface for code generated hash code of {@link RowData}, which will select some fields to hash.
+ * Interface for code generated hash code of {@link RowData} which will select some fields to hash
+ * or {@link ArrayData} or {@link MapData}.
+ *
+ * <p>Due to Janino's support for generic type is not very friendly, so here can't introduce generic
+ * type for {@link HashFunction}, please see https://github.com/janino-compiler/janino/issues/109
+ * for details.

Review Comment:
   You can define generic types in the interface. The Janino limitation requires generating the method parameters using `Object` types instead of generic types. However, that's not a problem. You can manually cast the input parameter to `RowData`/`ArrayData`/`MapData` type in the first line of the method. See `org.apache.flink.table.planner.codegen.FunctionCodeGenerator#generateFunction`
   
   I don't like overloading many methods, but just one method is valid. 



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -41,16 +45,164 @@ class HashCodeGeneratorTest {
 
     val hashFunc2 = HashCodeGenerator
       .generateRowHash(
-        new CodeGeneratorContext(new Configuration, Thread.currentThread().getContextClassLoader),
+        new CodeGeneratorContext(new Configuration, classLoader),
         RowType.of(new IntType(), new BigIntType(), new VarBinaryType(VarBinaryType.MAX_LENGTH)),
         "name",
         Array(1, 2, 0)
       )
       .newInstance(classLoader)
 
     val row = GenericRowData.of(ji(5), jl(8), Array[Byte](1, 5, 6))
-    Assert.assertEquals(637, hashFunc1.hashCode(row))
-    Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+    assertEquals(637, hashFunc1.hashCode(row))
+    assertEquals(136516167, hashFunc2.hashCode(row))
+
+    // test row with nested array and map type
+    val hashFunc3 = HashCodeGenerator
+      .generateRowHash(
+        new CodeGeneratorContext(new Configuration, classLoader),
+        RowType.of(
+          new IntType(),
+          new ArrayType(new IntType()),
+          new MultisetType(new IntType()),
+          new MapType(new IntType(), new VarCharType())),
+        "name",
+        Array(1, 2, 0, 3)
+      )
+      .newInstance(classLoader)
+
+    val row3 = GenericRowData.of(
+      ji(5),
+      new GenericArrayData(Array(1, 5, 7)),
+      new GenericMapData(Map(1 -> null, 5 -> null, 10 -> null)),
+      new GenericMapData(
+        Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    )
+    assertEquals(1065781729, hashFunc3.hashCode(row3))
+
+    // test hash code for ArrayData
+    CommonTestUtils.assertThrows(
+      "RowData hash function doesn't support to generate hash code for ArrayData.",
+      classOf[RuntimeException],
+      () => hashFunc3.hashCode(new GenericArrayData(Array(1)))
+    )
+
+    // test hash code for MapData
+    CommonTestUtils.assertThrows(
+      "RowData hash function doesn't support to generate hash code for MapData.",
+      classOf[RuntimeException],
+      () => hashFunc3.hashCode(new GenericMapData(null)))
+  }
+
+  @Test
+  def testArrayHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration(), classLoader),
+        new IntType(),
+        "name")
+      .newInstance(classLoader)
+
+    val array1 = new GenericArrayData(Array(1, 5, 7))
+    assertEquals(1123, hashFunc1.hashCode(array1))
+
+    // test complex map type of element
+    val hashFunc2 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration(), classLoader),
+        new MapType(new IntType(), new VarCharType()),
+        "name")
+      .newInstance(classLoader)
+
+    val mapData = new GenericMapData(
+      Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    val array2 = new GenericArrayData(Array[AnyRef](mapData))
+    assertEquals(93178751, hashFunc2.hashCode(array2))
+
+    // test complex row type of element
+    val hashFunc3 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration(), classLoader),
+        RowType.of(new IntType(), new BigIntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val array3 = new GenericArrayData(
+      Array[AnyRef](GenericRowData.of(ji(5), jl(8)), GenericRowData.of(ji(25), jl(52))))
+    assertEquals(14520, hashFunc3.hashCode(array3))
+
+    // test hash code for RowData
+    CommonTestUtils.assertThrows(
+      "ArrayData hash function doesn't support to generate hash code for RowData.",
+      classOf[RuntimeException],
+      () => hashFunc3.hashCode(GenericRowData.of(null))
+    )
+
+    // test hash code for MapData
+    CommonTestUtils.assertThrows(
+      "ArrayData hash function doesn't support to generate hash code for MapData.",
+      classOf[RuntimeException],
+      () => hashFunc3.hashCode(new GenericMapData(null))
+    )
+  }
+
+  @Test
+  def testMapHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateMapHash(
+        new CodeGeneratorContext(new Configuration(), classLoader),
+        new IntType(),
+        new VarCharType(),
+        "name")
+      .newInstance(classLoader)
+
+    val map1 = new GenericMapData(
+      Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))

Review Comment:
   Please test hashcode of 2 maps are equal, which have entries in a different order and contain null values. 
   



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -41,16 +45,164 @@ class HashCodeGeneratorTest {
 
     val hashFunc2 = HashCodeGenerator
       .generateRowHash(
-        new CodeGeneratorContext(new Configuration, Thread.currentThread().getContextClassLoader),
+        new CodeGeneratorContext(new Configuration, classLoader),
         RowType.of(new IntType(), new BigIntType(), new VarBinaryType(VarBinaryType.MAX_LENGTH)),
         "name",
         Array(1, 2, 0)
       )
       .newInstance(classLoader)
 
     val row = GenericRowData.of(ji(5), jl(8), Array[Byte](1, 5, 6))
-    Assert.assertEquals(637, hashFunc1.hashCode(row))
-    Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+    assertEquals(637, hashFunc1.hashCode(row))
+    assertEquals(136516167, hashFunc2.hashCode(row))
+
+    // test row with nested array and map type
+    val hashFunc3 = HashCodeGenerator
+      .generateRowHash(
+        new CodeGeneratorContext(new Configuration, classLoader),
+        RowType.of(
+          new IntType(),
+          new ArrayType(new IntType()),
+          new MultisetType(new IntType()),
+          new MapType(new IntType(), new VarCharType())),
+        "name",
+        Array(1, 2, 0, 3)
+      )
+      .newInstance(classLoader)
+
+    val row3 = GenericRowData.of(
+      ji(5),
+      new GenericArrayData(Array(1, 5, 7)),
+      new GenericMapData(Map(1 -> null, 5 -> null, 10 -> null)),
+      new GenericMapData(
+        Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    )
+    assertEquals(1065781729, hashFunc3.hashCode(row3))
+
+    // test hash code for ArrayData
+    CommonTestUtils.assertThrows(
+      "RowData hash function doesn't support to generate hash code for ArrayData.",
+      classOf[RuntimeException],
+      () => hashFunc3.hashCode(new GenericArrayData(Array(1)))
+    )
+
+    // test hash code for MapData
+    CommonTestUtils.assertThrows(
+      "RowData hash function doesn't support to generate hash code for MapData.",
+      classOf[RuntimeException],
+      () => hashFunc3.hashCode(new GenericMapData(null)))
+  }
+
+  @Test
+  def testArrayHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration(), classLoader),
+        new IntType(),
+        "name")
+      .newInstance(classLoader)
+
+    val array1 = new GenericArrayData(Array(1, 5, 7))
+    assertEquals(1123, hashFunc1.hashCode(array1))

Review Comment:
   Asserting the hash int is hard to tell right or wrong. Would be better to add assertions, e.g. 
   
   ```
   val array1 = new GenericArrayData(Array(1, 5, 7))
       val array2 = new GenericArrayData(Array(1, 5, 7))
       val array3 = new GenericArrayData(Array[AnyRef](1, null, 5, null, 7))
       assertEquals(hashFunc1.hashCode(array1), hashFunc1.hashCode(array2))
       assertNotEquals(hashFunc1.hashCode(array1), hashFunc1.hashCode(array3))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19709:
URL: https://github.com/apache/flink/pull/19709#issuecomment-1124960579

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2f1e9e9b7954b9a03a80de6ad9e0cab52bfd5a3b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2f1e9e9b7954b9a03a80de6ad9e0cab52bfd5a3b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2f1e9e9b7954b9a03a80de6ad9e0cab52bfd5a3b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #19709:
URL: https://github.com/apache/flink/pull/19709#issuecomment-1125578866

   @beyond1920 Can you help review this pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #19709:
URL: https://github.com/apache/flink/pull/19709#discussion_r873688161


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -18,18 +18,26 @@
 package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.data.GenericRowData
-import org.apache.flink.table.types.logical.{BigIntType, IntType, RowType, VarBinaryType}
+import org.apache.flink.table.data.{GenericArrayData, GenericMapData, GenericRowData, StringData}
+import org.apache.flink.table.types.logical.{ArrayType, BigIntType, IntType, MapType, MultisetType, RowType, VarBinaryType, VarCharType}
 
-import org.junit.{Assert, Test}
+import org.junit.{Assert, Rule, Test}
+import org.junit.rules.ExpectedException
+
+import scala.collection.JavaConversions.mapAsJavaMap
 
 /** Test for [[HashCodeGenerator]]. */
 class HashCodeGeneratorTest {
 
   private val classLoader = Thread.currentThread().getContextClassLoader
 
+  var expectedException: ExpectedException = ExpectedException.none

Review Comment:
   I have researched spark and presto. In spark, map type is not allowed used in join and group by clause because of map type is not an orderable type, but it supported as the distribute by key. In presto, map supported as group by key, it is because presto only implement hash-based aggregate. So, I think we should support map type field as shuffle key, but doesn't as sort key, such as order by/sort-base join/agg clause. we should support to generate hash code for map which is also as spark and presto, this may be useful for distribute by clause which we will support in 1.16.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #19709:
URL: https://github.com/apache/flink/pull/19709#issuecomment-1173031621

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org