You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/05/10 12:55:38 UTC
[flink] branch master updated: [FLINK-22604][table-runtime-blink]
Fix NPE on bundle close when task failover after a failed task open
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6daa30f [FLINK-22604][table-runtime-blink] Fix NPE on bundle close when task failover after a failed task open
6daa30f is described below
commit 6daa30f555b14c2c9cd8d8109dfd32bf7161dc7f
Author: lincoln lee <li...@gmail.com>
AuthorDate: Mon May 10 20:55:18 2021 +0800
[FLINK-22604][table-runtime-blink] Fix NPE on bundle close when task failover after a failed task open
This closes #15863
---
.../harness/GroupAggregateHarnessTest.scala | 73 +++++++-----
.../planner/runtime/harness/HarnessTestBase.scala | 19 ++-
.../runtime/harness/OverAggregateHarnessTest.scala | 73 +++++++-----
.../harness/TableAggregateHarnessTest.scala | 58 +++++----
.../harness/WindowAggregateHarnessTest.scala | 130 +++++++++++++++++----
.../window/LocalSlicingWindowAggOperator.java | 4 +-
.../bundle/AbstractMapBundleOperator.java | 2 +-
7 files changed, 256 insertions(+), 103 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
index 0b8ada6..969473a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
@@ -19,16 +19,20 @@
package org.apache.flink.table.planner.runtime.harness
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{EnvironmentSettings, _}
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
import org.apache.flink.table.api.config.ExecutionConfigOptions.{TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, TABLE_EXEC_MINIBATCH_ENABLED, TABLE_EXEC_MINIBATCH_SIZE}
import org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY
+import org.apache.flink.table.api.{EnvironmentSettings, _}
+import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOff, MiniBatchOn}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.CountNullNonNull
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord
+import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.types.Row
import org.apache.flink.types.RowKind._
@@ -46,7 +50,7 @@ import scala.collection.mutable
@RunWith(classOf[Parameterized])
class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode)
- extends HarnessTestBase(mode) {
+ extends HarnessTestBase(mode) {
@Before
override def before(): Unit = {
@@ -100,7 +104,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
testHarness.setStateTtlProcessingTime(1)
// insertion
- testHarness.processElement(binaryRecord(INSERT,"aaa", 1L: JLong))
+ testHarness.processElement(binaryRecord(INSERT, "aaa", 1L: JLong))
expectedOutput.add(binaryRecord(INSERT, "aaa", 1L: JLong))
// insertion
@@ -144,7 +148,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
expectedOutput.add(binaryRecord(INSERT, "eee", 6L: JLong))
// retract
- testHarness.processElement(binaryRecord(INSERT,"aaa", 7L: JLong))
+ testHarness.processElement(binaryRecord(INSERT, "aaa", 7L: JLong))
expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 9L: JLong))
expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 16L: JLong))
testHarness.processElement(binaryRecord(INSERT, "bbb", 3L: JLong))
@@ -160,28 +164,8 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
@Test
def testAggregationWithDistinct(): Unit = {
- val data = new mutable.MutableList[(String, String, Long)]
- val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
- tEnv.createTemporaryView("T", t)
- tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull)
-
- val sql =
- """
- |SELECT a, COUNT(DISTINCT b), CntNullNonNull(DISTINCT b), COUNT(*), SUM(c)
- |FROM T
- |GROUP BY a
- """.stripMargin
- val t1 = tEnv.sqlQuery(sql)
-
- tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
- val testHarness = createHarnessTester(t1.toRetractStream[Row], "GroupAggregate")
- val assertor = new RowDataHarnessAssertor(
- Array(
- DataTypes.STRING().getLogicalType,
- DataTypes.BIGINT().getLogicalType,
- DataTypes.STRING().getLogicalType,
- DataTypes.BIGINT().getLogicalType,
- DataTypes.BIGINT().getLogicalType))
+ val (testHarness, outputTypes) = createAggregationWithDistinct
+ val assertor = new RowDataHarnessAssertor(outputTypes)
testHarness.open()
@@ -191,7 +175,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
testHarness.setStateTtlProcessingTime(1)
// insertion
- testHarness.processElement(binaryRecord(INSERT,"aaa", "a1", 1L: JLong))
+ testHarness.processElement(binaryRecord(INSERT, "aaa", "a1", 1L: JLong))
expectedOutput.add(binaryRecord(INSERT, "aaa", 1L: JLong, "1|0", 1L: JLong, 1L: JLong))
// insertion
@@ -240,6 +224,41 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
testHarness.close()
}
+ private def createAggregationWithDistinct()
+ : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
+ val data = new mutable.MutableList[(String, String, Long)]
+ val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.createTemporaryView("T", t)
+ tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull)
+
+ val sql =
+ """
+ |SELECT a, COUNT(DISTINCT b), CntNullNonNull(DISTINCT b), COUNT(*), SUM(c)
+ |FROM T
+ |GROUP BY a
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
+ val testHarness = createHarnessTester(t1.toRetractStream[Row], "GroupAggregate")
+ val outputTypes = Array(
+ DataTypes.STRING().getLogicalType,
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.STRING().getLogicalType,
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.BIGINT().getLogicalType)
+
+ (testHarness, outputTypes)
+ }
+
+ @Test
+ def testCloseWithoutOpen(): Unit = {
+ val (testHarness, outputType) = createAggregationWithDistinct
+ testHarness.setup(new RowDataSerializer(outputType: _*))
+ // simulate a failover after a failed task open(e.g., stuck on initializing)
+ // expect no exception happens
+ testHarness.close()
+ }
}
object GroupAggregateHarnessTest {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
index 191ba5f..d7dc473 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
@@ -28,9 +28,9 @@ import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.streaming.api.transformations.{OneInputTransformation, PartitionTransformation}
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, OneInputStreamOperatorTestHarness}
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.JLong
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
@@ -86,6 +86,19 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase {
.asInstanceOf[KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData]]
}
+ def createHarnessTesterForNoState(
+ ds: DataStream[_],
+ prefixOperatorName: String)
+ : OneInputStreamOperatorTestHarness[RowData, RowData] = {
+ val transformation = extractExpectedTransformation(
+ ds.javaStream.getTransformation,
+ prefixOperatorName)
+ val processOperator = transformation.getOperator
+ .asInstanceOf[OneInputStreamOperator[Any, Any]]
+ new OneInputStreamOperatorTestHarness(processOperator)
+ .asInstanceOf[OneInputStreamOperatorTestHarness[RowData, RowData]]
+ }
+
private def extractExpectedTransformation(
t: Transformation[_],
prefixOperatorName: String): OneInputTransformation[_, _] = {
@@ -96,6 +109,8 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase {
} else {
extractExpectedTransformation(one.getInputs.get(0), prefixOperatorName)
}
+ case p: PartitionTransformation[_] =>
+ extractExpectedTransformation(p.getInputs.get(0), prefixOperatorName)
case _ => throw new Exception(
s"Can not find the expected $prefixOperatorName transformation")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
index 191a895..7c1f566 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
@@ -20,14 +20,17 @@ package org.apache.flink.table.planner.runtime.harness
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
import org.apache.flink.table.runtime.util.StreamRecordUtils.{binaryrow, row}
+import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.types.Row
import org.junit.runner.RunWith
@@ -52,34 +55,8 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
@Test
def testProcTimeBoundedRowsOver(): Unit = {
-
- val data = new mutable.MutableList[(Long, String, Long)]
- val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime)
- tEnv.registerTable("T", t)
-
- val sql =
- """
- |SELECT currtime, b, c,
- | min(c) OVER
- | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
- | max(c) OVER
- | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
- |FROM T
- """.stripMargin
- val t1 = tEnv.sqlQuery(sql)
-
- tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4))
- val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
- val assertor = new RowDataHarnessAssertor(
- Array(
- DataTypes.BIGINT().getLogicalType,
- DataTypes.STRING().getLogicalType,
- DataTypes.BIGINT().getLogicalType,
- DataTypes.BIGINT().getLogicalType,
- DataTypes.BIGINT().getLogicalType,
- DataTypes.BIGINT().getLogicalType,
- DataTypes.BIGINT().getLogicalType))
-
+ val (testHarness, outputType) = createProcTimeBoundedRowsOver
+ val assertor = new RowDataHarnessAssertor(outputType)
testHarness.open()
// register cleanup timer with 3001
@@ -161,6 +138,36 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
testHarness.close()
}
+ private def createProcTimeBoundedRowsOver()
+ : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
+ val data = new mutable.MutableList[(Long, String, Long)]
+ val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime)
+ tEnv.registerTable("T", t)
+
+ val sql =
+ """
+ |SELECT currtime, b, c,
+ | min(c) OVER
+ | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
+ | max(c) OVER
+ | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
+ |FROM T
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4))
+ val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+ val outputType = Array(
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.STRING().getLogicalType,
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.BIGINT().getLogicalType)
+ (testHarness, outputType)
+ }
+
/**
* NOTE: all elements at the same proc timestamp have the same value per key
*/
@@ -940,4 +947,12 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
testHarness.close()
}
+
+ @Test
+ def testCloseWithoutOpen(): Unit = {
+ val (testHarness, outputType) = createProcTimeBoundedRowsOver
+ testHarness.setup(new RowDataSerializer(outputType: _*))
+ // simulate a failover after a failed task open, expect no exception happens
+ testHarness.open()
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
index 32b2688..d9dcf7f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
@@ -18,23 +18,27 @@
package org.apache.flink.table.planner.runtime.harness
-import java.lang.{Integer => JInt}
-import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
-import org.apache.flink.table.api.EnvironmentSettings
+import org.apache.flink.table.api.{EnvironmentSettings, _}
+import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.utils.{Top3WithMapView, Top3WithRetractInput}
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
import org.apache.flink.table.runtime.util.StreamRecordUtils.{deleteRecord, insertRecord}
+import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.types.Row
+
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{Before, Test}
+import java.lang.{Integer => JInt}
import java.time.Duration
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.mutable
@@ -117,22 +121,8 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
@Test
def testTableAggregateWithRetractInput(): Unit = {
- val top3 = new Top3WithRetractInput
- tEnv.registerFunction("top3", top3)
- val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
- val resultTable = source
- .groupBy('a)
- .select('b.sum as 'b)
- .flatAggregate(top3('b) as ('b1, 'b2))
- .select('b1, 'b2)
-
- tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
- val testHarness = createHarnessTester(
- resultTable.toRetractStream[Row], "GroupTableAggregate")
- val assertor = new RowDataHarnessAssertor(
- Array(
- DataTypes.INT().getLogicalType,
- DataTypes.INT().getLogicalType))
+ val (testHarness, outputTypes) = createTableAggregateWithRetract
+ val assertor = new RowDataHarnessAssertor(outputTypes)
testHarness.open()
val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -170,4 +160,32 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
testHarness.close()
}
+
+ private def createTableAggregateWithRetract()
+ : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
+ val top3 = new Top3WithRetractInput
+ tEnv.registerFunction("top3", top3)
+ val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+ val resultTable = source
+ .groupBy('a)
+ .select('b.sum as 'b)
+ .flatAggregate(top3('b) as('b1, 'b2))
+ .select('b1, 'b2)
+
+ tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
+ val testHarness = createHarnessTester(
+ resultTable.toRetractStream[Row], "GroupTableAggregate")
+ val outputTypes = Array(
+ DataTypes.INT().getLogicalType,
+ DataTypes.INT().getLogicalType)
+ (testHarness, outputTypes)
+ }
+
+ @Test
+ def testCloseWithoutOpen(): Unit = {
+ val (testHarness, outputTypes) = createTableAggregateWithRetract
+ testHarness.setup(new RowDataSerializer(outputTypes: _*))
+ // simulate a failover after a failed task open, expect no exception happens
+ testHarness.close()
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
index 0bb81a7..6587af3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
@@ -19,18 +19,21 @@
package org.apache.flink.table.planner.runtime.harness
import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.data.{RowData, TimestampData}
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.planner.runtime.utils.TestData
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord
import org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills
-
+import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.types.Row
import org.apache.flink.types.RowKind.INSERT
@@ -84,6 +87,32 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
*/
@Test
def testProcessingTimeTumbleWindow(): Unit = {
+ val (testHarness, outputTypes) = createProcessingTimeTumbleWindowOperator
+ val assertor = new RowDataHarnessAssertor(outputTypes)
+
+ testHarness.open()
+ ingestData(testHarness)
+ val expected = new ConcurrentLinkedQueue[Object]()
+ expected.add(record("a", 4L, 5.0D, 2L,
+ localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")))
+ expected.add(record("a", 1L, null, 1L,
+ localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")))
+ expected.add(record("b", 2L, 6.0D, 2L,
+ localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")))
+ expected.add(record("b", 1L, 4.0D, 1L,
+ localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20")))
+ expected.add(record("b", 1L, 3.0D, 1L,
+ localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")))
+ expected.add(record(null, 1L, 7.0D, 0L,
+ localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")))
+
+ assertor.assertOutputEqualsSorted("result mismatch", expected, testHarness.getOutput)
+
+ testHarness.close()
+ }
+
+ private def createProcessingTimeTumbleWindowOperator()
+ : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
val sql =
"""
|SELECT
@@ -100,34 +129,15 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
val t1 = tEnv.sqlQuery(sql)
val testHarness = createHarnessTester(t1.toAppendStream[Row], "WindowAggregate")
// window aggregate put window properties at the end of aggs
- val assertor = new RowDataHarnessAssertor(
+ val outputTypes =
Array(
DataTypes.STRING().getLogicalType,
DataTypes.BIGINT().getLogicalType,
DataTypes.DOUBLE().getLogicalType,
DataTypes.BIGINT().getLogicalType,
DataTypes.TIMESTAMP_LTZ(3).getLogicalType,
- DataTypes.TIMESTAMP_LTZ(3).getLogicalType))
-
- testHarness.open()
- ingestData(testHarness)
- val expected = new ConcurrentLinkedQueue[Object]()
- expected.add(record("a", 4L, 5.0D, 2L,
- localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")))
- expected.add(record("a", 1L, null, 1L,
- localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")))
- expected.add(record("b", 2L, 6.0D, 2L,
- localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")))
- expected.add(record("b", 1L, 4.0D, 1L,
- localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20")))
- expected.add(record("b", 1L, 3.0D, 1L,
- localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")))
- expected.add(record(null, 1L, 7.0D, 0L,
- localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")))
-
- assertor.assertOutputEqualsSorted("result mismatch", expected, testHarness.getOutput)
-
- testHarness.close()
+ DataTypes.TIMESTAMP_LTZ(3).getLogicalType)
+ (testHarness, outputTypes)
}
/**
@@ -266,6 +276,80 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
testHarness.close()
}
+ @Test
+ def testCloseWithoutOpen(): Unit = {
+ val (testHarness, outputTypes) = createProcessingTimeTumbleWindowOperator
+ testHarness.setup(new RowDataSerializer(outputTypes: _*))
+ // simulate a failover after a failed task open, expect no exception happens
+ testHarness.close()
+ }
+
+ /**
+ * Processing time window doesn't support two-phase, so add a single two-phase test.
+ */
+ @Test
+ def testTwoPhaseWindowAggregateCloseWithoutOpen(): Unit = {
+ val timestampDataId = TestValuesTableFactory.registerData(TestData.windowDataWithTimestamp)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE T2 (
+ | `ts` STRING,
+ | `int` INT,
+ | `double` DOUBLE,
+ | `float` FLOAT,
+ | `bigdec` DECIMAL(10, 2),
+ | `string` STRING,
+ | `name` STRING,
+ | `rowtime` AS
+ | TO_TIMESTAMP(`ts`),
+ | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '${timestampDataId}',
+ | 'failing-source' = 'false'
+ |)
+ |""".stripMargin)
+
+ tEnv.getConfig.getConfiguration.setString(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
+
+ val sql =
+ """
+ |SELECT
+ | `name`,
+ | window_start,
+ | window_end,
+ | COUNT(*),
+ | MAX(`double`),
+ | COUNT(DISTINCT `string`)
+ |FROM TABLE(
+ | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
+ |GROUP BY `name`, window_start, window_end
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+ val stream: DataStream[Row] = t1.toAppendStream[Row]
+
+ val testHarness = createHarnessTesterForNoState(stream, "LocalWindowAggregate")
+ // window aggregate put window properties at the end of aggs
+ val outputTypes = Array(
+ DataTypes.STRING().getLogicalType,
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.DOUBLE().getLogicalType,
+ DataTypes.BIGINT().getLogicalType,
+ DataTypes.TIMESTAMP_LTZ(3).getLogicalType,
+ DataTypes.TIMESTAMP_LTZ(3).getLogicalType)
+ testHarness.setup(new RowDataSerializer(outputTypes: _*))
+
+ // simulate a failover after a failed task open, expect no exception happens
+ testHarness.close()
+
+ val testHarness1 = createHarnessTester(stream, "GlobalWindowAggregate")
+ testHarness1.setup(new RowDataSerializer(outputTypes: _*))
+
+ // simulate a failover after a failed task open, expect no exception happens
+ testHarness1.close()
+ }
+
/**
* Ingests testing data, the input schema is [name, double, string, proctime].
* We follow the test data in [[TestData.windowDataWithTimestamp]] to have the same produced
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index b044db7..63f184a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -143,7 +143,9 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
super.close();
collector = null;
functionsClosed = true;
- windowBuffer.close();
+ if (windowBuffer != null) {
+ windowBuffer.close();
+ }
}
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
index 4d10646..eba6e32 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
@@ -131,7 +131,7 @@ public abstract class AbstractMapBundleOperator<K, V, IN, OUT> extends AbstractS
@Override
public void finishBundle() throws Exception {
- if (!bundle.isEmpty()) {
+ if (bundle != null && !bundle.isEmpty()) {
numOfElements = 0;
function.finishBundle(bundle, collector);
bundle.clear();