You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/05/10 12:55:38 UTC

[flink] branch master updated: [FLINK-22604][table-runtime-blink] Fix NPE on bundle close when task failover after a failed task open

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6daa30f  [FLINK-22604][table-runtime-blink] Fix NPE on bundle close when task failover after a failed task open
6daa30f is described below

commit 6daa30f555b14c2c9cd8d8109dfd32bf7161dc7f
Author: lincoln lee <li...@gmail.com>
AuthorDate: Mon May 10 20:55:18 2021 +0800

    [FLINK-22604][table-runtime-blink] Fix NPE on bundle close when task failover after a failed task open
    
    This closes #15863
---
 .../harness/GroupAggregateHarnessTest.scala        |  73 +++++++-----
 .../planner/runtime/harness/HarnessTestBase.scala  |  19 ++-
 .../runtime/harness/OverAggregateHarnessTest.scala |  73 +++++++-----
 .../harness/TableAggregateHarnessTest.scala        |  58 +++++----
 .../harness/WindowAggregateHarnessTest.scala       | 130 +++++++++++++++++----
 .../window/LocalSlicingWindowAggOperator.java      |   4 +-
 .../bundle/AbstractMapBundleOperator.java          |   2 +-
 7 files changed, 256 insertions(+), 103 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
index 0b8ada6..969473a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
@@ -19,16 +19,20 @@
 package org.apache.flink.table.planner.runtime.harness
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{EnvironmentSettings, _}
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.config.ExecutionConfigOptions.{TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, TABLE_EXEC_MINIBATCH_ENABLED, TABLE_EXEC_MINIBATCH_SIZE}
 import org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY
+import org.apache.flink.table.api.{EnvironmentSettings, _}
+import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOff, MiniBatchOn}
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.CountNullNonNull
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
 import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord
+import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.types.Row
 import org.apache.flink.types.RowKind._
 
@@ -46,7 +50,7 @@ import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode)
-    extends HarnessTestBase(mode) {
+  extends HarnessTestBase(mode) {
 
   @Before
   override def before(): Unit = {
@@ -100,7 +104,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
     testHarness.setStateTtlProcessingTime(1)
 
     // insertion
-    testHarness.processElement(binaryRecord(INSERT,"aaa", 1L: JLong))
+    testHarness.processElement(binaryRecord(INSERT, "aaa", 1L: JLong))
     expectedOutput.add(binaryRecord(INSERT, "aaa", 1L: JLong))
 
     // insertion
@@ -144,7 +148,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
     expectedOutput.add(binaryRecord(INSERT, "eee", 6L: JLong))
 
     // retract
-    testHarness.processElement(binaryRecord(INSERT,"aaa", 7L: JLong))
+    testHarness.processElement(binaryRecord(INSERT, "aaa", 7L: JLong))
     expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 9L: JLong))
     expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 16L: JLong))
     testHarness.processElement(binaryRecord(INSERT, "bbb", 3L: JLong))
@@ -160,28 +164,8 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
 
   @Test
   def testAggregationWithDistinct(): Unit = {
-    val data = new mutable.MutableList[(String, String, Long)]
-    val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.createTemporaryView("T", t)
-    tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull)
-
-    val sql =
-      """
-        |SELECT a, COUNT(DISTINCT b), CntNullNonNull(DISTINCT b), COUNT(*), SUM(c)
-        |FROM T
-        |GROUP BY a
-      """.stripMargin
-    val t1 = tEnv.sqlQuery(sql)
-
-    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
-    val testHarness = createHarnessTester(t1.toRetractStream[Row], "GroupAggregate")
-    val assertor = new RowDataHarnessAssertor(
-      Array(
-        DataTypes.STRING().getLogicalType,
-        DataTypes.BIGINT().getLogicalType,
-        DataTypes.STRING().getLogicalType,
-        DataTypes.BIGINT().getLogicalType,
-        DataTypes.BIGINT().getLogicalType))
+    val (testHarness, outputTypes) = createAggregationWithDistinct
+    val assertor = new RowDataHarnessAssertor(outputTypes)
 
     testHarness.open()
 
@@ -191,7 +175,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
     testHarness.setStateTtlProcessingTime(1)
 
     // insertion
-    testHarness.processElement(binaryRecord(INSERT,"aaa", "a1", 1L: JLong))
+    testHarness.processElement(binaryRecord(INSERT, "aaa", "a1", 1L: JLong))
     expectedOutput.add(binaryRecord(INSERT, "aaa", 1L: JLong, "1|0", 1L: JLong, 1L: JLong))
 
     // insertion
@@ -240,6 +224,41 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode
     testHarness.close()
   }
 
+  private def createAggregationWithDistinct()
+    : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
+    val data = new mutable.MutableList[(String, String, Long)]
+    val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.createTemporaryView("T", t)
+    tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull)
+
+    val sql =
+      """
+        |SELECT a, COUNT(DISTINCT b), CntNullNonNull(DISTINCT b), COUNT(*), SUM(c)
+        |FROM T
+        |GROUP BY a
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
+    val testHarness = createHarnessTester(t1.toRetractStream[Row], "GroupAggregate")
+    val outputTypes = Array(
+      DataTypes.STRING().getLogicalType,
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.STRING().getLogicalType,
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.BIGINT().getLogicalType)
+
+    (testHarness, outputTypes)
+  }
+
+  @Test
+  def testCloseWithoutOpen(): Unit = {
+    val (testHarness, outputType) = createAggregationWithDistinct
+    testHarness.setup(new RowDataSerializer(outputType: _*))
+    // simulate a failover after a failed task open(e.g., stuck on initializing)
+    // expect no exception happens
+    testHarness.close()
+  }
 }
 
 object GroupAggregateHarnessTest {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
index 191ba5f..d7dc473 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
@@ -28,9 +28,9 @@ import org.apache.flink.runtime.state.StateBackend
 import org.apache.flink.runtime.state.memory.MemoryStateBackend
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.streaming.api.transformations.{OneInputTransformation, PartitionTransformation}
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, OneInputStreamOperatorTestHarness}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.JLong
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
@@ -86,6 +86,19 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase {
       .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData]]
   }
 
+  def createHarnessTesterForNoState(
+      ds: DataStream[_],
+      prefixOperatorName: String)
+  : OneInputStreamOperatorTestHarness[RowData, RowData] = {
+    val transformation = extractExpectedTransformation(
+      ds.javaStream.getTransformation,
+      prefixOperatorName)
+    val processOperator = transformation.getOperator
+        .asInstanceOf[OneInputStreamOperator[Any, Any]]
+    new OneInputStreamOperatorTestHarness(processOperator)
+        .asInstanceOf[OneInputStreamOperatorTestHarness[RowData, RowData]]
+  }
+
   private def extractExpectedTransformation(
       t: Transformation[_],
       prefixOperatorName: String): OneInputTransformation[_, _] = {
@@ -96,6 +109,8 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase {
         } else {
           extractExpectedTransformation(one.getInputs.get(0), prefixOperatorName)
         }
+      case p: PartitionTransformation[_] =>
+        extractExpectedTransformation(p.getInputs.get(0), prefixOperatorName)
       case _ => throw new Exception(
         s"Can not find the expected $prefixOperatorName transformation")
     }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
index 191a895..7c1f566 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
@@ -20,14 +20,17 @@ package org.apache.flink.table.planner.runtime.harness
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
 import org.apache.flink.table.runtime.util.StreamRecordUtils.{binaryrow, row}
+import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.types.Row
 
 import org.junit.runner.RunWith
@@ -52,34 +55,8 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
 
   @Test
   def testProcTimeBoundedRowsOver(): Unit = {
-
-    val data = new mutable.MutableList[(Long, String, Long)]
-    val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime)
-    tEnv.registerTable("T", t)
-
-    val sql =
-      """
-        |SELECT currtime, b, c,
-        | min(c) OVER
-        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
-        | max(c) OVER
-        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
-        |FROM T
-      """.stripMargin
-    val t1 = tEnv.sqlQuery(sql)
-
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4))
-    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
-    val assertor = new RowDataHarnessAssertor(
-      Array(
-        DataTypes.BIGINT().getLogicalType,
-        DataTypes.STRING().getLogicalType,
-        DataTypes.BIGINT().getLogicalType,
-        DataTypes.BIGINT().getLogicalType,
-        DataTypes.BIGINT().getLogicalType,
-        DataTypes.BIGINT().getLogicalType,
-        DataTypes.BIGINT().getLogicalType))
-
+    val (testHarness, outputType) = createProcTimeBoundedRowsOver
+    val assertor = new RowDataHarnessAssertor(outputType)
     testHarness.open()
 
     // register cleanup timer with 3001
@@ -161,6 +138,36 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
     testHarness.close()
   }
 
+  private def createProcTimeBoundedRowsOver()
+    : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
+    val data = new mutable.MutableList[(Long, String, Long)]
+    val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime)
+    tEnv.registerTable("T", t)
+
+    val sql =
+      """
+        |SELECT currtime, b, c,
+        | min(c) OVER
+        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
+        | max(c) OVER
+        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
+        |FROM T
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4))
+    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+    val outputType = Array(
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.STRING().getLogicalType,
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.BIGINT().getLogicalType)
+    (testHarness, outputType)
+  }
+
   /**
     * NOTE: all elements at the same proc timestamp have the same value per key
     */
@@ -940,4 +947,12 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
     testHarness.close()
   }
+
+  @Test
+  def testCloseWithoutOpen(): Unit = {
+    val (testHarness, outputType) = createProcTimeBoundedRowsOver
+    testHarness.setup(new RowDataSerializer(outputType: _*))
+    // simulate a failover after a failed task open, expect no exception happens
+    testHarness.open()
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
index 32b2688..d9dcf7f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
@@ -18,23 +18,27 @@
 
 package org.apache.flink.table.planner.runtime.harness
 
-import java.lang.{Integer => JInt}
-import java.util.concurrent.ConcurrentLinkedQueue
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
-import org.apache.flink.table.api.EnvironmentSettings
+import org.apache.flink.table.api.{EnvironmentSettings, _}
+import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.utils.{Top3WithMapView, Top3WithRetractInput}
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
 import org.apache.flink.table.runtime.util.StreamRecordUtils.{deleteRecord, insertRecord}
+import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.types.Row
+
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Before, Test}
 
+import java.lang.{Integer => JInt}
 import java.time.Duration
+import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.collection.mutable
 
@@ -117,22 +121,8 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
 
   @Test
   def testTableAggregateWithRetractInput(): Unit = {
-    val top3 = new Top3WithRetractInput
-    tEnv.registerFunction("top3", top3)
-    val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
-    val resultTable = source
-      .groupBy('a)
-      .select('b.sum as 'b)
-      .flatAggregate(top3('b) as ('b1, 'b2))
-      .select('b1, 'b2)
-
-    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
-    val testHarness = createHarnessTester(
-      resultTable.toRetractStream[Row], "GroupTableAggregate")
-    val assertor = new RowDataHarnessAssertor(
-      Array(
-        DataTypes.INT().getLogicalType,
-        DataTypes.INT().getLogicalType))
+    val (testHarness, outputTypes) = createTableAggregateWithRetract
+    val assertor = new RowDataHarnessAssertor(outputTypes)
 
     testHarness.open()
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -170,4 +160,32 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
     testHarness.close()
   }
+
+  private def createTableAggregateWithRetract()
+    : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
+    val top3 = new Top3WithRetractInput
+    tEnv.registerFunction("top3", top3)
+    val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+    val resultTable = source
+        .groupBy('a)
+        .select('b.sum as 'b)
+        .flatAggregate(top3('b) as('b1, 'b2))
+        .select('b1, 'b2)
+
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
+    val testHarness = createHarnessTester(
+      resultTable.toRetractStream[Row], "GroupTableAggregate")
+    val outputTypes = Array(
+      DataTypes.INT().getLogicalType,
+      DataTypes.INT().getLogicalType)
+    (testHarness, outputTypes)
+  }
+
+  @Test
+  def testCloseWithoutOpen(): Unit = {
+    val (testHarness, outputTypes) = createTableAggregateWithRetract
+    testHarness.setup(new RowDataSerializer(outputTypes: _*))
+    // simulate a failover after a failed task open, expect no exception happens
+    testHarness.close()
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
index 0bb81a7..6587af3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
@@ -19,18 +19,21 @@
 package org.apache.flink.table.planner.runtime.harness
 
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.data.{RowData, TimestampData}
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.planner.runtime.utils.TestData
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
 import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord
 import org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills
-
+import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.types.Row
 import org.apache.flink.types.RowKind.INSERT
 
@@ -84,6 +87,32 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
    */
   @Test
   def testProcessingTimeTumbleWindow(): Unit = {
+    val (testHarness, outputTypes) = createProcessingTimeTumbleWindowOperator
+    val assertor = new RowDataHarnessAssertor(outputTypes)
+
+    testHarness.open()
+    ingestData(testHarness)
+    val expected = new ConcurrentLinkedQueue[Object]()
+    expected.add(record("a", 4L, 5.0D, 2L,
+      localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")))
+    expected.add(record("a", 1L, null, 1L,
+      localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")))
+    expected.add(record("b", 2L, 6.0D, 2L,
+      localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")))
+    expected.add(record("b", 1L, 4.0D, 1L,
+      localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20")))
+    expected.add(record("b", 1L, 3.0D, 1L,
+      localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")))
+    expected.add(record(null, 1L, 7.0D, 0L,
+      localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expected, testHarness.getOutput)
+
+    testHarness.close()
+  }
+
+  private def createProcessingTimeTumbleWindowOperator()
+    : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
     val sql =
       """
         |SELECT
@@ -100,34 +129,15 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
     val t1 = tEnv.sqlQuery(sql)
     val testHarness = createHarnessTester(t1.toAppendStream[Row], "WindowAggregate")
     // window aggregate put window properties at the end of aggs
-    val assertor = new RowDataHarnessAssertor(
+    val outputTypes =
       Array(
         DataTypes.STRING().getLogicalType,
         DataTypes.BIGINT().getLogicalType,
         DataTypes.DOUBLE().getLogicalType,
         DataTypes.BIGINT().getLogicalType,
         DataTypes.TIMESTAMP_LTZ(3).getLogicalType,
-        DataTypes.TIMESTAMP_LTZ(3).getLogicalType))
-
-    testHarness.open()
-    ingestData(testHarness)
-    val expected = new ConcurrentLinkedQueue[Object]()
-    expected.add(record("a", 4L, 5.0D, 2L,
-      localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05")))
-    expected.add(record("a", 1L, null, 1L,
-      localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")))
-    expected.add(record("b", 2L, 6.0D, 2L,
-      localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10")))
-    expected.add(record("b", 1L, 4.0D, 1L,
-      localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20")))
-    expected.add(record("b", 1L, 3.0D, 1L,
-      localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")))
-    expected.add(record(null, 1L, 7.0D, 0L,
-      localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35")))
-
-    assertor.assertOutputEqualsSorted("result mismatch", expected, testHarness.getOutput)
-
-    testHarness.close()
+        DataTypes.TIMESTAMP_LTZ(3).getLogicalType)
+    (testHarness, outputTypes)
   }
 
   /**
@@ -266,6 +276,80 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI
     testHarness.close()
   }
 
+  @Test
+  def testCloseWithoutOpen(): Unit = {
+    val (testHarness, outputTypes) = createProcessingTimeTumbleWindowOperator
+    testHarness.setup(new RowDataSerializer(outputTypes: _*))
+    // simulate a failover after a failed task open, expect no exception happens
+    testHarness.close()
+  }
+
+  /**
+   * Processing time window doesn't support two-phase, so add a single two-phase test.
+    */
+  @Test
+  def testTwoPhaseWindowAggregateCloseWithoutOpen(): Unit = {
+    val timestampDataId = TestValuesTableFactory.registerData(TestData.windowDataWithTimestamp)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE T2 (
+         | `ts` STRING,
+         | `int` INT,
+         | `double` DOUBLE,
+         | `float` FLOAT,
+         | `bigdec` DECIMAL(10, 2),
+         | `string` STRING,
+         | `name` STRING,
+         | `rowtime` AS
+         | TO_TIMESTAMP(`ts`),
+         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
+         |) WITH (
+         | 'connector' = 'values',
+         | 'data-id' = '${timestampDataId}',
+         | 'failing-source' = 'false'
+         |)
+         |""".stripMargin)
+
+    tEnv.getConfig.getConfiguration.setString(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
+
+    val sql =
+      """
+        |SELECT
+        |  `name`,
+        |  window_start,
+        |  window_end,
+        |  COUNT(*),
+        |  MAX(`double`),
+        |  COUNT(DISTINCT `string`)
+        |FROM TABLE(
+        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
+        |GROUP BY `name`, window_start, window_end
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+    val stream: DataStream[Row] = t1.toAppendStream[Row]
+
+    val testHarness = createHarnessTesterForNoState(stream, "LocalWindowAggregate")
+    // window aggregate put window properties at the end of aggs
+    val outputTypes = Array(
+      DataTypes.STRING().getLogicalType,
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.DOUBLE().getLogicalType,
+      DataTypes.BIGINT().getLogicalType,
+      DataTypes.TIMESTAMP_LTZ(3).getLogicalType,
+      DataTypes.TIMESTAMP_LTZ(3).getLogicalType)
+    testHarness.setup(new RowDataSerializer(outputTypes: _*))
+
+    // simulate a failover after a failed task open, expect no exception happens
+    testHarness.close()
+
+    val testHarness1 = createHarnessTester(stream, "GlobalWindowAggregate")
+    testHarness1.setup(new RowDataSerializer(outputTypes: _*))
+
+    // simulate a failover after a failed task open, expect no exception happens
+    testHarness1.close()
+  }
+
   /**
    * Ingests testing data, the input schema is [name, double, string, proctime].
    * We follow the test data in [[TestData.windowDataWithTimestamp]] to have the same produced
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index b044db7..63f184a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -143,7 +143,9 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
         super.close();
         collector = null;
         functionsClosed = true;
-        windowBuffer.close();
+        if (windowBuffer != null) {
+            windowBuffer.close();
+        }
     }
 
     @Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
index 4d10646..eba6e32 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java
@@ -131,7 +131,7 @@ public abstract class AbstractMapBundleOperator<K, V, IN, OUT> extends AbstractS
 
     @Override
     public void finishBundle() throws Exception {
-        if (!bundle.isEmpty()) {
+        if (bundle != null && !bundle.isEmpty()) {
             numOfElements = 0;
             function.finishBundle(bundle, collector);
             bundle.clear();