You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/18 10:29:12 UTC

[flink] branch release-1.9 updated: [FLINK-13738][blink-table-planner] Fix NegativeArraySizeException in LongHybridHashTable

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new fd07573  [FLINK-13738][blink-table-planner] Fix NegativeArraySizeException in LongHybridHashTable
fd07573 is described below

commit fd07573aaf79090fa85ef48ae8fa10d823915f90
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 16 11:16:29 2019 +0200

    [FLINK-13738][blink-table-planner] Fix NegativeArraySizeException in LongHybridHashTable
    
    This closes #9462
---
 .../runtime/batch/sql/join/JoinITCase.scala        | 27 +++++++++++++++++++++-
 .../runtime/hashtable/LongHybridHashTable.java     |  6 ++++-
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
index cde5f86..7f5b2c8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.runtime.batch.sql.join
 
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{DOUBLE_TYPE_INFO, INT_TYPE_INFO, LONG_TYPE_INFO}
 import org.apache.flink.api.common.typeutils.TypeComparator
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
 import org.apache.flink.table.api.Types
@@ -69,6 +69,31 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
       ))
   }
 
+  @Test
+  def testLongJoinWithBigRange(): Unit = {
+    registerCollection(
+      "inputT1",
+      Seq(
+        row(Long.box(Long.MaxValue), Double.box(1)),
+        row(Long.box(Long.MinValue), Double.box(1))),
+      new RowTypeInfo(LONG_TYPE_INFO, DOUBLE_TYPE_INFO),
+      "a, b")
+    registerCollection(
+      "inputT2",
+      Seq(
+        row(Long.box(Long.MaxValue), Double.box(1)),
+        row(Long.box(Long.MinValue), Double.box(1))),
+      new RowTypeInfo(LONG_TYPE_INFO, DOUBLE_TYPE_INFO),
+      "c, d")
+
+    checkResult(
+      "SELECT a, b, c, d FROM inputT1, inputT2 WHERE a = c",
+      Seq(
+        row(Long.box(Long.MaxValue), Double.box(1), Long.box(Long.MaxValue), Double.box(1)),
+        row(Long.box(Long.MinValue), Double.box(1), Long.box(Long.MinValue), Double.box(1))
+      ))
+  }
+
   @Ignore // TODO support lazy from source
   @Test
   def testLongHashJoinGenerator(): Unit = {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
index 117d5a6..a37b7ec 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
@@ -206,7 +206,11 @@ public abstract class LongHybridHashTable extends BaseHybridHashTable {
 		}
 
 		long range = maxKey - minKey + 1;
-		if (range <= recordCount * 4 || range <= segmentSize / 8) {
+
+		// 1.range is negative mean: range is to big to overflow
+		// 2.range is zero, maybe the max is Long.Max, and the min is Long.Min,
+		// so we should not use dense mode too.
+		if (range > 0 && (range <= recordCount * 4 || range <= segmentSize / 8)) {
 
 			// try to request memory.
 			int buffers = (int) Math.ceil(((double) (range * 8)) / segmentSize);