You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/06/27 17:18:57 UTC
flink git commit: [FLINK-9524] [table] Check for expired clean-up
timers to prevent NPE in ProcTimeBoundedRangeOver.
Repository: flink
Updated Branches:
refs/heads/master a161606a6 -> eb525b7f8
[FLINK-9524] [table] Check for expired clean-up timers to prevent NPE in ProcTimeBoundedRangeOver.
This closes #6180.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eb525b7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eb525b7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eb525b7f
Branch: refs/heads/master
Commit: eb525b7f889600fa4f4dbbdbee161848e5d570dd
Parents: a161606
Author: Yan Zhou <yz...@gmail.com>
Authored: Mon Jun 18 11:42:29 2018 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Jun 27 19:17:58 2018 +0200
----------------------------------------------------------------------
.../aggregate/ProcTimeBoundedRangeOver.scala | 11 +++++++++--
.../runtime/harness/OverWindowHarnessTest.scala | 19 +++++++++++++++++++
2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eb525b7f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index e00c7ac..591b942 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -132,6 +132,14 @@ class ProcTimeBoundedRangeOver(
val currentTime = timestamp - 1
var i = 0
+ // get the list of elements of current proctime
+ val currentElements = rowMapState.get(currentTime)
+
+ // Expired clean-up timers pass the needToCleanupState() check.
+ // Perform a null check to verify that we have data to process.
+ if (null == currentElements) {
+ return
+ }
// initialize the accumulators
var accumulators = accumulatorState.value()
@@ -172,8 +180,7 @@ class ProcTimeBoundedRangeOver(
i += 1
}
- // get the list of elements of current proctime
- val currentElements = rowMapState.get(currentTime)
+
// add current elements to aggregator. Multiple elements might
// have arrived in the same proctime
// the same accumulator value will be computed for all elements
http://git-wip-us.apache.org/repos/asf/flink/blob/eb525b7f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 6f6fc0e..218cae2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -208,6 +208,21 @@ class OverWindowHarnessTest extends HarnessTestBase{
testHarness.setProcessingTime(11006)
+ // test for clean-up timer NPE
+ testHarness.setProcessingTime(20000)
+
+ // timer registered for 23000
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(0L: JLong, "ccc", 10L: JLong), change = true)))
+
+ // update clean-up timer to 25500. Previous timer should not clean up
+ testHarness.setProcessingTime(22500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(0L: JLong, "ccc", 20L: JLong), change = true)))
+
+ // 23000 clean-up timer should fire but not fail with an NPE
+ testHarness.setProcessingTime(23001)
+
val result = testHarness.getOutput
val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -241,6 +256,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), change = true)))
expectedOutput.add(new StreamRecord(
CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true)))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(0L: JLong, "ccc", 10L: JLong, 10L: JLong, 10L: JLong), change = true)))
+ expectedOutput.add(new StreamRecord(
+ CRow(Row.of(0L: JLong, "ccc", 20L: JLong, 10L: JLong, 20L: JLong), change = true)))
verify(expectedOutput, result, new RowResultSortComparator())