You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/17 13:24:40 UTC
[2/3] flink git commit: [FLINK-6583] [table] Add state cleanup for
counting GroupWindows.
[FLINK-6583] [table] Add state cleanup for counting GroupWindows.
This closes #3919.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d85d9693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d85d9693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d85d9693
Branch: refs/heads/master
Commit: d85d969334e89d83aec60f9bb3d2c69a4701eb54
Parents: 64d3ce8
Author: sunjincheng121 <su...@gmail.com>
Authored: Tue May 16 11:58:37 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed May 17 15:24:23 2017 +0200
----------------------------------------------------------------------
.../DataStreamGroupWindowAggregate.scala | 32 +++-
.../triggers/StateCleaningCountTrigger.scala | 136 +++++++++++++++++
.../table/GroupWindowAggregationsITCase.scala | 10 +-
.../StateCleaningCountTriggerHarnessTest.scala | 147 +++++++++++++++++++
4 files changed, 316 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c158579..1ac013a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -25,7 +25,8 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow}
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -40,7 +41,8 @@ import org.apache.flink.table.runtime.aggregate.AggregateUtil._
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
+import org.slf4j.LoggerFactory
class DataStreamGroupWindowAggregate(
window: LogicalWindow,
@@ -54,6 +56,8 @@ class DataStreamGroupWindowAggregate(
grouping: Array[Int])
extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
override def deriveRowType(): RelDataType = schema.logicalType
override def needsUpdatesAsRetraction = true
@@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
"non-windowed GroupBy aggregation.")
}
+ val isCountWindow = window match {
+ case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true
+ case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true
+ case _ => false
+ }
+
+ if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) {
+ LOG.warn(
+ "No state retention interval configured for a query which accumulates state. " +
+ "Please provide a query configuration with valid retention interval to prevent excessive " +
+ "state size. You may specify a retention time of 0 to not clean up the state.")
+ }
+
val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
val aggString = aggregationToString(
@@ -167,7 +184,7 @@ class DataStreamGroupWindowAggregate(
val keyedStream = inputDS.keyBy(physicalGrouping: _*)
val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
+ createKeyedWindowedStream(queryConfig, window, keyedStream)
.asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
val (aggFunction, accumulatorRowType, aggResultRowType) =
@@ -192,7 +209,7 @@ class DataStreamGroupWindowAggregate(
physicalNamedProperties)
val windowedStream =
- createNonKeyedWindowedStream(window, inputDS)
+ createNonKeyedWindowedStream(queryConfig, window, inputDS)
.asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
val (aggFunction, accumulatorRowType, aggResultRowType) =
@@ -215,6 +232,7 @@ class DataStreamGroupWindowAggregate(
object DataStreamGroupWindowAggregate {
private def createKeyedWindowedStream(
+ queryConfig: StreamQueryConfig,
groupWindow: LogicalWindow,
stream: KeyedStream[CRow, Tuple]):
WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match {
@@ -226,6 +244,7 @@ object DataStreamGroupWindowAggregate {
case TumblingGroupWindow(_, timeField, size)
if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
stream.countWindow(toLong(size))
+ .trigger(PurgingTrigger.of(StateCleaningCountTrigger.of(queryConfig, toLong(size))));
case TumblingGroupWindow(_, timeField, size)
if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size) =>
@@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate {
case SlidingGroupWindow(_, timeField, size, slide)
if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
stream.countWindow(toLong(size), toLong(slide))
+ .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
case SlidingGroupWindow(_, timeField, size, slide)
if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
@@ -267,6 +287,7 @@ object DataStreamGroupWindowAggregate {
}
private def createNonKeyedWindowedStream(
+ queryConfig: StreamQueryConfig,
groupWindow: LogicalWindow,
stream: DataStream[CRow]):
AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match {
@@ -278,6 +299,7 @@ object DataStreamGroupWindowAggregate {
case TumblingGroupWindow(_, timeField, size)
if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
stream.countWindowAll(toLong(size))
+ .trigger(PurgingTrigger.of(StateCleaningCountTrigger.of(queryConfig, toLong(size))));
case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
stream.windowAll(TumblingEventTimeWindows.of(toTime(size)))
@@ -296,6 +318,7 @@ object DataStreamGroupWindowAggregate {
case SlidingGroupWindow(_, timeField, size, slide)
if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
stream.countWindowAll(toLong(size), toLong(slide))
+ .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
case SlidingGroupWindow(_, timeField, size, slide)
if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
@@ -317,6 +340,5 @@ object DataStreamGroupWindowAggregate {
stream.windowAll(EventTimeSessionWindows.withGap(toTime(gap)))
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
new file mode 100644
index 0000000..f3f9246
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.triggers
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger.Sum
+
+/**
+ * A [[Trigger]] that fires once the count of elements in a pane reaches the given count
+ * or the cleanup timer is triggered.
+ */
+class StateCleaningCountTrigger(queryConfig: StreamQueryConfig, maxCount: Long)
+ extends Trigger[Any, GlobalWindow] {
+
+ protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+ protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+ protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+ private val stateDesc =
+ new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG)
+
+ private val cleanupStateDesc =
+ new ValueStateDescriptor[JLong]("countCleanup", Types.LONG)
+
+ override def canMerge = false
+
+ override def toString: String = "CountTriggerGlobalWindowithCleanupState(" +
+ "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " +
+ "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " +
+ "maxCount=" + maxCount + ")"
+
+ override def onElement(
+ element: Any,
+ timestamp: Long,
+ window: GlobalWindow,
+ ctx: TriggerContext): TriggerResult = {
+
+ val currentTime = ctx.getCurrentProcessingTime
+
+ // register cleanup timer
+ if (stateCleaningEnabled) {
+ // last registered timer
+ val curCleanupTime = ctx.getPartitionedState(cleanupStateDesc).value()
+
+ // check if a cleanup timer is registered and
+ // that the current cleanup timer won't delete state we need to keep
+ if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
+ // we need to register a new (later) timer
+ val cleanupTime = currentTime + maxRetentionTime
+ // register timer and remember clean-up time
+ ctx.registerProcessingTimeTimer(cleanupTime)
+
+ if (null != curCleanupTime) {
+ ctx.deleteProcessingTimeTimer(curCleanupTime)
+ }
+
+ ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime)
+ }
+ }
+
+ val count = ctx.getPartitionedState(stateDesc)
+ count.add(1L)
+
+ if (count.get >= maxCount) {
+ count.clear()
+ TriggerResult.FIRE
+ } else {
+ TriggerResult.CONTINUE
+ }
+ }
+
+ override def onProcessingTime(
+ time: Long,
+ window: GlobalWindow,
+ ctx: TriggerContext): TriggerResult = {
+
+ if (stateCleaningEnabled) {
+ val cleanupTime = ctx.getPartitionedState(cleanupStateDesc).value()
+ // check that the triggered timer is the last registered processing time timer.
+ if (null != cleanupTime && time == cleanupTime) {
+ clear(window, ctx)
+ return TriggerResult.FIRE_AND_PURGE
+ }
+ }
+ TriggerResult.CONTINUE
+ }
+
+ override def onEventTime(time: Long, window: GlobalWindow, ctx: TriggerContext): TriggerResult = {
+ TriggerResult.CONTINUE
+ }
+
+ override def clear(window: GlobalWindow, ctx: TriggerContext): Unit = {
+ ctx.getPartitionedState(stateDesc).clear()
+ ctx.getPartitionedState(cleanupStateDesc).clear()
+ }
+
+}
+
+object StateCleaningCountTrigger {
+
+ /**
+ * Create a [[StateCleaningCountTrigger]] instance.
+ *
+ * @param queryConfig query configuration.
+ * @param maxCount The count of elements at which to fire.
+ */
+ def of(queryConfig: StreamQueryConfig, maxCount: Long): StateCleaningCountTrigger =
+ new StateCleaningCountTrigger(queryConfig, maxCount)
+
+ class Sum extends ReduceFunction[JLong] {
+ override def reduce(value1: JLong, value2: JLong): JLong = value1 + value2
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 846fe3e..81d3577 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -18,13 +18,14 @@
package org.apache.flink.table.api.scala.stream.table
+import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset
@@ -41,7 +42,8 @@ import scala.collection.mutable
* programs is possible.
*/
class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
-
+ private val queryConfig = new StreamQueryConfig()
+ queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
val data = List(
(1L, 1, "Hi"),
(2L, 2, "Hello"),
@@ -68,7 +70,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
.select('string, countFun('int), 'int.avg,
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toDataStream[Row](queryConfig)
results.addSink(new StreamITCase.StringSink)
env.execute()
@@ -136,7 +138,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
.select(countFun('string), 'int.avg,
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
- val results = windowedTable.toDataStream[Row]
+ val results = windowedTable.toDataStream[Row](queryConfig)
results.addSink(new StreamITCase.StringSink)
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
new file mode 100644
index 0000000..96601fb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class StateCleaningCountTriggerHarnessTest {
+ protected var queryConfig =
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+
+ @Test
+ def testFiringAndFiringWithPurging(): Unit = {
+ val testHarness = new TriggerTestHarness[Any, GlobalWindow](
+ StateCleaningCountTrigger.of(queryConfig, 10), new GlobalWindow.Serializer)
+
+ // try to trigger onProcessingTime method via 1, but there is non timer is triggered
+ assertEquals(0, testHarness.advanceProcessingTime(1).size())
+
+ // register cleanup timer with 3001
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+ // try to trigger onProcessingTime method via 1000, but there is non timer is triggered
+ assertEquals(0, testHarness.advanceProcessingTime(1000).size())
+
+ // 1000 + 2000 <= 3001 reuse timer 3001
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+ // there are two state entries, one is timer(3001) another is counter(2)
+ assertEquals(2, testHarness.numStateEntries)
+
+ // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered
+ assertEquals(
+ TriggerResult.FIRE_AND_PURGE,
+ testHarness.advanceProcessingTime(3001).iterator().next().f1)
+
+ assertEquals(0, testHarness.numStateEntries)
+
+ // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+ // try to trigger onProcessingTime method via 4002, but there is non timer is triggered
+ assertEquals(0, testHarness.advanceProcessingTime(4002).size())
+
+ // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+ // 4002 + 2000 <= 7002 reuse timer 7002
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+ // have one timer 7002
+ assertEquals(1, testHarness.numProcessingTimeTimers)
+ assertEquals(0, testHarness.numEventTimeTimers)
+ assertEquals(2, testHarness.numStateEntries)
+ assertEquals(2, testHarness.numStateEntries(GlobalWindow.get))
+
+ // 4002 + 2000 <= 7002 reuse timer 7002
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+ assertEquals(
+ TriggerResult.FIRE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+ // counter of window() is cleared
+ assertEquals(1, testHarness.numStateEntries)
+ assertEquals(1, testHarness.numStateEntries(GlobalWindow.get))
+
+ // try to trigger onProcessingTime method via 7002, and all states are cleared
+ assertEquals(
+ TriggerResult.FIRE_AND_PURGE,
+ testHarness.advanceProcessingTime(7002).iterator().next().f1)
+
+ assertEquals(0, testHarness.numStateEntries)
+ }
+
+ /**
+ * Verify that clear() does not leak across windows.
+ */
+ @Test
+ def testClear() {
+ val testHarness = new TriggerTestHarness[Any, GlobalWindow](
+ StateCleaningCountTrigger.of(queryConfig, 3),
+ new GlobalWindow.Serializer)
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+ // have 1 timers
+ assertEquals(1, testHarness.numProcessingTimeTimers)
+ assertEquals(0, testHarness.numEventTimeTimers)
+ assertEquals(2, testHarness.numStateEntries)
+ assertEquals(2, testHarness.numStateEntries(GlobalWindow.get))
+
+ testHarness.clearTriggerState(GlobalWindow.get)
+
+ assertEquals(0, testHarness.numStateEntries)
+ assertEquals(0, testHarness.numStateEntries(GlobalWindow.get))
+ }
+}