You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/06/27 17:18:57 UTC

flink git commit: [FLINK-9524] [table] Check for expired clean-up timers to prevent NPE in ProcTimeBoundedRangeOver.

Repository: flink
Updated Branches:
  refs/heads/master a161606a6 -> eb525b7f8


[FLINK-9524] [table] Check for expired clean-up timers to prevent NPE in ProcTimeBoundedRangeOver.

This closes #6180.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eb525b7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eb525b7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eb525b7f

Branch: refs/heads/master
Commit: eb525b7f889600fa4f4dbbdbee161848e5d570dd
Parents: a161606
Author: Yan Zhou <yz...@gmail.com>
Authored: Mon Jun 18 11:42:29 2018 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Jun 27 19:17:58 2018 +0200

----------------------------------------------------------------------
 .../aggregate/ProcTimeBoundedRangeOver.scala     | 11 +++++++++--
 .../runtime/harness/OverWindowHarnessTest.scala  | 19 +++++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eb525b7f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index e00c7ac..591b942 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -132,6 +132,14 @@ class ProcTimeBoundedRangeOver(
 
     val currentTime = timestamp - 1
     var i = 0
+    // get the list of elements of current proctime
+    val currentElements = rowMapState.get(currentTime)
+
+    // Expired clean-up timers pass the needToCleanupState() check.
+    // Perform a null check to verify that we have data to process.
+    if (null == currentElements) {
+      return
+    }
 
     // initialize the accumulators
     var accumulators = accumulatorState.value()
@@ -172,8 +180,7 @@ class ProcTimeBoundedRangeOver(
       i += 1
     }
 
-    // get the list of elements of current proctime
-    val currentElements = rowMapState.get(currentTime)
+
     // add current elements to aggregator. Multiple elements might
     // have arrived in the same proctime
     // the same accumulator value will be computed for all elements

http://git-wip-us.apache.org/repos/asf/flink/blob/eb525b7f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 6f6fc0e..218cae2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -208,6 +208,21 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.setProcessingTime(11006)
 
+    // test for clean-up timer NPE
+    testHarness.setProcessingTime(20000)
+
+    // timer registered for 23000
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(0L: JLong, "ccc", 10L: JLong), change = true)))
+
+    // update clean-up timer to 25500. Previous timer should not clean up
+    testHarness.setProcessingTime(22500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(0L: JLong, "ccc", 20L: JLong), change = true)))
+
+    // 23000 clean-up timer should fire but not fail with an NPE
+    testHarness.setProcessingTime(23001)
+
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -241,6 +256,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
       CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), change = true)))
     expectedOutput.add(new StreamRecord(
       CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(0L: JLong, "ccc", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(0L: JLong, "ccc", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
 
     verify(expectedOutput, result, new RowResultSortComparator())