You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/29 13:11:18 UTC

[GitHub] [flink] Tartarus0zm commented on a diff in pull request #19709: [FLINK-25645][table-runtime] UnsupportedOperationException would thrown out when hash shuffle by a field with array type

Tartarus0zm commented on code in PR #19709:
URL: https://github.com/apache/flink/pull/19709#discussion_r909604325


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -51,6 +59,145 @@ class HashCodeGeneratorTest {
     val row = GenericRowData.of(ji(5), jl(8), Array[Byte](1, 5, 6))
     Assert.assertEquals(637, hashFunc1.hashCode(row))
     Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+
+    // test row with nested array and map type
+    val hashFunc3 = HashCodeGenerator
+      .generateRowHash(
+        new CodeGeneratorContext(new Configuration),
+        RowType.of(
+          new IntType(),
+          new ArrayType(new IntType()),
+          new MultisetType(new IntType()),
+          new MapType(new IntType(), new VarCharType())),
+        "name",
+        Array(1, 2, 0, 3)
+      )
+      .newInstance(classLoader)
+
+    val row3 = GenericRowData.of(
+      ji(5),
+      new GenericArrayData(Array(1, 5, 7)),
+      new GenericMapData(Map(1 -> null, 5 -> null, 10 -> null)),
+      new GenericMapData(
+        Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    )
+    Assert.assertEquals(368915957, hashFunc3.hashCode(row3))
+
+    // test hash code for ArrayData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericArrayData(Array(1)))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericMapData(null))
+  }
+
+  @Test
+  def testArrayHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateArrayHash(new CodeGeneratorContext(new Configuration()), new IntType(), "name")
+      .newInstance(classLoader)
+
+    val array1 = new GenericArrayData(Array(1, 5, 7))
+    Assert.assertEquals(13, hashFunc1.hashCode(array1))
+
+    // test complex map type of element
+    val hashFunc2 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration()),
+        new MapType(new IntType(), new VarCharType()),
+        "name")
+      .newInstance(classLoader)
+
+    val mapData = new GenericMapData(
+      Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    val array2 = new GenericArrayData(Array[AnyRef](mapData))
+    Assert.assertEquals(357483069, hashFunc2.hashCode(array2))
+
+    // test complex row type of element
+    val hashFunc3 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration()),
+        RowType.of(new IntType(), new BigIntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val array3 = new GenericArrayData(
+      Array[AnyRef](GenericRowData.of(ji(5), jl(8)), GenericRowData.of(ji(25), jl(52))))
+    Assert.assertEquals(2430, hashFunc3.hashCode(array3))
+
+    // test hash code for RowData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "ArrayData hash function doesn't support to generate hash code for RowData.")
+    hashFunc3.hashCode(GenericRowData.of(null))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "ArrayData hash function doesn't support to generate hash code for RowData.")

Review Comment:
   For the same reasons as above



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -51,6 +59,145 @@ class HashCodeGeneratorTest {
     val row = GenericRowData.of(ji(5), jl(8), Array[Byte](1, 5, 6))
     Assert.assertEquals(637, hashFunc1.hashCode(row))
     Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+
+    // test row with nested array and map type
+    val hashFunc3 = HashCodeGenerator
+      .generateRowHash(
+        new CodeGeneratorContext(new Configuration),
+        RowType.of(
+          new IntType(),
+          new ArrayType(new IntType()),
+          new MultisetType(new IntType()),
+          new MapType(new IntType(), new VarCharType())),
+        "name",
+        Array(1, 2, 0, 3)
+      )
+      .newInstance(classLoader)
+
+    val row3 = GenericRowData.of(
+      ji(5),
+      new GenericArrayData(Array(1, 5, 7)),
+      new GenericMapData(Map(1 -> null, 5 -> null, 10 -> null)),
+      new GenericMapData(
+        Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    )
+    Assert.assertEquals(368915957, hashFunc3.hashCode(row3))
+
+    // test hash code for ArrayData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericArrayData(Array(1)))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericMapData(null))
+  }
+
+  @Test
+  def testArrayHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateArrayHash(new CodeGeneratorContext(new Configuration()), new IntType(), "name")
+      .newInstance(classLoader)
+
+    val array1 = new GenericArrayData(Array(1, 5, 7))
+    Assert.assertEquals(13, hashFunc1.hashCode(array1))
+
+    // test complex map type of element
+    val hashFunc2 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration()),
+        new MapType(new IntType(), new VarCharType()),
+        "name")
+      .newInstance(classLoader)
+
+    val mapData = new GenericMapData(
+      Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    val array2 = new GenericArrayData(Array[AnyRef](mapData))
+    Assert.assertEquals(357483069, hashFunc2.hashCode(array2))
+
+    // test complex row type of element
+    val hashFunc3 = HashCodeGenerator
+      .generateArrayHash(
+        new CodeGeneratorContext(new Configuration()),
+        RowType.of(new IntType(), new BigIntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val array3 = new GenericArrayData(
+      Array[AnyRef](GenericRowData.of(ji(5), jl(8)), GenericRowData.of(ji(25), jl(52))))
+    Assert.assertEquals(2430, hashFunc3.hashCode(array3))
+
+    // test hash code for RowData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "ArrayData hash function doesn't support to generate hash code for RowData.")
+    hashFunc3.hashCode(GenericRowData.of(null))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "ArrayData hash function doesn't support to generate hash code for RowData.")
+    hashFunc3.hashCode(new GenericMapData(null))
+  }
+
+  @Test
+  def testMapHash(): Unit = {
+    // test primitive type
+    val hashFunc1 = HashCodeGenerator
+      .generateMapHash(
+        new CodeGeneratorContext(new Configuration()),
+        new IntType(),
+        new VarCharType(),
+        "name")
+      .newInstance(classLoader)
+
+    val map1 = new GenericMapData(
+      Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    Assert.assertEquals(357483069, hashFunc1.hashCode(map1))
+
+    // test complex row type of value
+    val hashFunc2 = HashCodeGenerator
+      .generateMapHash(
+        new CodeGeneratorContext(new Configuration()),
+        new IntType(),
+        RowType.of(new IntType(), new BigIntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val map2 = new GenericMapData(
+      Map(1 -> GenericRowData.of(ji(5), jl(8)), 5 -> GenericRowData.of(ji(54), jl(78)), 10 -> null))
+    Assert.assertEquals(4763, hashFunc2.hashCode(map2))
+
+    // test complex array type of value
+    val hashFunc3 = HashCodeGenerator
+      .generateMapHash(
+        new CodeGeneratorContext(new Configuration()),
+        new IntType(),
+        new ArrayType(new IntType()),
+        "name")
+      .newInstance(classLoader)
+
+    val map3 = new GenericMapData(
+      Map(
+        1 -> new GenericArrayData(Array(1, 5, 7)),
+        5 -> new GenericArrayData(Array(2, 4, 8)),
+        10 -> null))
+    Assert.assertEquals(43, hashFunc3.hashCode(map3))
+
+    // test hash code for RowData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage("MapData hash function doesn't support to generate hash code for RowData.")
+    hashFunc3.hashCode(GenericRowData.of(null))
+
+    // test hash code for ArrayData
+    thrown.expect(classOf[RuntimeException])

Review Comment:
   The code here will not be executed



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/HashCodeGenerator.scala:
##########
@@ -75,6 +75,146 @@ object HashCodeGenerator {
           return $resultTerm;
         }
 
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for ArrayData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("RowData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateArrayHash(
+      ctx: CodeGeneratorContext,
+      elementType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val typeTerm = primitiveTypeTermForType(elementType)
+    val isNull = newName("isNull")
+    val fieldTerm = newName("fieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate element hash code firstly
+    val elementHashBody = hashCodeForType(ctx, elementType, fieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          int $hashIntTerm = 0;
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $isNull = $inputTerm.isNullAt($i);
+            if (!$isNull) {
+              $typeTerm $fieldTerm = ${rowFieldReadAccess(i, inputTerm, elementType)};
+              $hashIntTerm += $elementHashBody;
+            }
+          }
+
+          return $hashIntTerm;
+        }
+
+        @Override
+        public int hashCode($ROW_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for RowData.")}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for MapData.")}
+        }
+
+        ${ctx.reuseInnerClassDefinitionCode()}
+      }
+    """.stripMargin
+
+    new GeneratedHashFunction(className, code, ctx.references.toArray, ctx.tableConfig)
+  }
+
+  def generateMapHash(
+      ctx: CodeGeneratorContext,
+      keyType: LogicalType,
+      valueType: LogicalType,
+      name: String): GeneratedHashFunction = {
+    val className = newName(name)
+    val baseClass = classOf[HashFunction]
+    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
+
+    val keyTypeTerm = primitiveTypeTermForType(keyType)
+    val valueTypeTerm = primitiveTypeTermForType(valueType)
+    val keys = newName("keys")
+    val values = newName("values")
+    val keyIsNull = newName("keyIsNull")
+    val keyFieldTerm = newName("keyFieldTerm")
+    val valueIsNull = newName("valueIsNull")
+    val valueFieldTerm = newName("valueFieldTerm")
+    val hashIntTerm = CodeGenUtils.newName("hashCode")
+    val i = newName("i")
+
+    // Generate key and value hash code body firstly
+    val keyElementHashBody = hashCodeForType(ctx, keyType, keyFieldTerm)
+    val valueElementHashBody = hashCodeForType(ctx, valueType, valueFieldTerm)
+    val code =
+      j"""
+      public class $className implements ${baseClass.getCanonicalName} {
+
+        ${ctx.reuseMemberCode()}
+
+        public $className(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public int hashCode($MAP_DATA $inputTerm) {
+          $ARRAY_DATA $keys = $inputTerm.keyArray();
+          $ARRAY_DATA $values = $inputTerm.valueArray();
+
+          int $hashIntTerm = 0;
+          for (int $i = 0; $i < $inputTerm.size(); $i++) {
+            boolean $keyIsNull = $keys.isNullAt($i);
+            if (!$keyIsNull) {
+              $keyTypeTerm $keyFieldTerm = ${rowFieldReadAccess(i, keys, keyType)};
+              $hashIntTerm += $keyElementHashBody;
+            }
+
+            boolean $valueIsNull = $values.isNullAt($i);
+            if(!$valueIsNull) {
+              $valueTypeTerm $valueFieldTerm = ${rowFieldReadAccess(i, values, valueType)};
+              $hashIntTerm += $valueElementHashBody;
+            }
+          }
+
+          return $hashIntTerm;
+        }
+
+        @Override
+        public int hashCode($ROW_DATA $inputTerm) {
+          ${genThrowException("MapData hash function doesn't support to generate hash code for RowData.")}
+        }
+
+        @Override
+        public int hashCode($ARRAY_DATA $inputTerm) {
+          ${genThrowException("ArrayData hash function doesn't support to generate hash code for ArrayData.")}

Review Comment:
   It should be 
   >`MapData` hash function doesn't support to generate hash code for ArrayData. 
   not 
   >`ArrayData` hash function doesn't support to generate hash code for ArrayData.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/HashCodeGeneratorTest.scala:
##########
@@ -51,6 +59,145 @@ class HashCodeGeneratorTest {
     val row = GenericRowData.of(ji(5), jl(8), Array[Byte](1, 5, 6))
     Assert.assertEquals(637, hashFunc1.hashCode(row))
     Assert.assertEquals(136516167, hashFunc2.hashCode(row))
+
+    // test row with nested array and map type
+    val hashFunc3 = HashCodeGenerator
+      .generateRowHash(
+        new CodeGeneratorContext(new Configuration),
+        RowType.of(
+          new IntType(),
+          new ArrayType(new IntType()),
+          new MultisetType(new IntType()),
+          new MapType(new IntType(), new VarCharType())),
+        "name",
+        Array(1, 2, 0, 3)
+      )
+      .newInstance(classLoader)
+
+    val row3 = GenericRowData.of(
+      ji(5),
+      new GenericArrayData(Array(1, 5, 7)),
+      new GenericMapData(Map(1 -> null, 5 -> null, 10 -> null)),
+      new GenericMapData(
+        Map(1 -> StringData.fromString("ron"), 5 -> StringData.fromString("danny"), 10 -> null))
+    )
+    Assert.assertEquals(368915957, hashFunc3.hashCode(row3))
+
+    // test hash code for ArrayData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")
+    hashFunc3.hashCode(new GenericArrayData(Array(1)))
+
+    // test hash code for MapData
+    thrown.expect(classOf[RuntimeException])
+    thrown.expectMessage(
+      "RowData hash function doesn't support to generate hash code for ArrayData.")

Review Comment:
   It should be
   > RowData hash function doesn't support to generate hash code for `MapData`.
   not
   >RowData hash function doesn't support to generate hash code for `ArrayData`.
   
   However, since an exception occurred in the previous step when testing ArrayData, the code here was not executed, so the exception was not tested
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org