You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/17 13:24:40 UTC

[2/3] flink git commit: [FLINK-6583] [table] Add state cleanup for counting GroupWindows.

[FLINK-6583] [table] Add state cleanup for counting GroupWindows.

This closes #3919.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d85d9693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d85d9693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d85d9693

Branch: refs/heads/master
Commit: d85d969334e89d83aec60f9bb3d2c69a4701eb54
Parents: 64d3ce8
Author: sunjincheng121 <su...@gmail.com>
Authored: Tue May 16 11:58:37 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed May 17 15:24:23 2017 +0200

----------------------------------------------------------------------
 .../DataStreamGroupWindowAggregate.scala        |  32 +++-
 .../triggers/StateCleaningCountTrigger.scala    | 136 +++++++++++++++++
 .../table/GroupWindowAggregationsITCase.scala   |  10 +-
 .../StateCleaningCountTriggerHarnessTest.scala  | 147 +++++++++++++++++++
 4 files changed, 316 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c158579..1ac013a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -25,7 +25,8 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow}
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -40,7 +41,8 @@ import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
+import org.slf4j.LoggerFactory
 
 class DataStreamGroupWindowAggregate(
     window: LogicalWindow,
@@ -54,6 +56,8 @@ class DataStreamGroupWindowAggregate(
     grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
 
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
   override def deriveRowType(): RelDataType = schema.logicalType
 
   override def needsUpdatesAsRetraction = true
@@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
           "non-windowed GroupBy aggregation.")
     }
 
+    val isCountWindow = window match {
+      case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true
+      case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true
+      case _ => false
+    }
+
+    if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates state. " +
+        "Please provide a query configuration with valid retention interval to prevent excessive " +
+        "state size. You may specify a retention time of 0 to not clean up the state.")
+    }
+
     val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
 
     val aggString = aggregationToString(
@@ -167,7 +184,7 @@ class DataStreamGroupWindowAggregate(
 
       val keyedStream = inputDS.keyBy(physicalGrouping: _*)
       val windowedStream =
-        createKeyedWindowedStream(window, keyedStream)
+        createKeyedWindowedStream(queryConfig, window, keyedStream)
           .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
@@ -192,7 +209,7 @@ class DataStreamGroupWindowAggregate(
         physicalNamedProperties)
 
       val windowedStream =
-        createNonKeyedWindowedStream(window, inputDS)
+        createNonKeyedWindowedStream(queryConfig, window, inputDS)
           .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
@@ -215,6 +232,7 @@ class DataStreamGroupWindowAggregate(
 object DataStreamGroupWindowAggregate {
 
   private def createKeyedWindowedStream(
+      queryConfig: StreamQueryConfig,
       groupWindow: LogicalWindow,
       stream: KeyedStream[CRow, Tuple]):
     WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match {
@@ -226,6 +244,7 @@ object DataStreamGroupWindowAggregate {
     case TumblingGroupWindow(_, timeField, size)
         if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
       stream.countWindow(toLong(size))
+      .trigger(PurgingTrigger.of(StateCleaningCountTrigger.of(queryConfig, toLong(size))));
 
     case TumblingGroupWindow(_, timeField, size)
         if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size) =>
@@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate {
     case SlidingGroupWindow(_, timeField, size, slide)
         if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
       stream.countWindow(toLong(size), toLong(slide))
+      .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
 
     case SlidingGroupWindow(_, timeField, size, slide)
         if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
@@ -267,6 +287,7 @@ object DataStreamGroupWindowAggregate {
   }
 
   private def createNonKeyedWindowedStream(
+      queryConfig: StreamQueryConfig,
       groupWindow: LogicalWindow,
       stream: DataStream[CRow]):
     AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match {
@@ -278,6 +299,7 @@ object DataStreamGroupWindowAggregate {
     case TumblingGroupWindow(_, timeField, size)
         if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
       stream.countWindowAll(toLong(size))
+      .trigger(PurgingTrigger.of(StateCleaningCountTrigger.of(queryConfig, toLong(size))));
 
     case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
       stream.windowAll(TumblingEventTimeWindows.of(toTime(size)))
@@ -296,6 +318,7 @@ object DataStreamGroupWindowAggregate {
     case SlidingGroupWindow(_, timeField, size, slide)
         if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
       stream.countWindowAll(toLong(size), toLong(slide))
+      .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
 
     case SlidingGroupWindow(_, timeField, size, slide)
         if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
@@ -317,6 +340,5 @@ object DataStreamGroupWindowAggregate {
       stream.windowAll(EventTimeSessionWindows.withGap(toTime(gap)))
   }
 
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
new file mode 100644
index 0000000..f3f9246
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.triggers
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger.Sum
+
+/**
+  * A [[Trigger]] that fires once the count of elements in a pane reaches the given count
+  * or the cleanup timer is triggered.
+  */
+class StateCleaningCountTrigger(queryConfig: StreamQueryConfig, maxCount: Long)
+  extends Trigger[Any, GlobalWindow] {
+
+  protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val stateDesc =
+    new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG)
+
+  private val cleanupStateDesc =
+    new ValueStateDescriptor[JLong]("countCleanup", Types.LONG)
+
+  override def canMerge = false
+
+  override def toString: String = "CountTriggerGlobalWindowithCleanupState(" +
+    "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " +
+    "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " +
+    "maxCount=" + maxCount + ")"
+
+  override def onElement(
+      element: Any,
+      timestamp: Long,
+      window: GlobalWindow,
+      ctx: TriggerContext): TriggerResult = {
+
+    val currentTime = ctx.getCurrentProcessingTime
+
+    // register cleanup timer
+    if (stateCleaningEnabled) {
+      // last registered timer
+      val curCleanupTime = ctx.getPartitionedState(cleanupStateDesc).value()
+
+      // check if a cleanup timer is registered and
+      // that the current cleanup timer won't delete state we need to keep
+      if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
+        // we need to register a new (later) timer
+        val cleanupTime = currentTime + maxRetentionTime
+        // register timer and remember clean-up time
+        ctx.registerProcessingTimeTimer(cleanupTime)
+
+        if (null != curCleanupTime) {
+          ctx.deleteProcessingTimeTimer(curCleanupTime)
+        }
+
+        ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime)
+      }
+    }
+
+    val count = ctx.getPartitionedState(stateDesc)
+    count.add(1L)
+
+    if (count.get >= maxCount) {
+      count.clear()
+      TriggerResult.FIRE
+    } else {
+      TriggerResult.CONTINUE
+    }
+  }
+
+  override def onProcessingTime(
+      time: Long,
+      window: GlobalWindow,
+      ctx: TriggerContext): TriggerResult = {
+
+    if (stateCleaningEnabled) {
+      val cleanupTime = ctx.getPartitionedState(cleanupStateDesc).value()
+      // check that the triggered timer is the last registered processing time timer.
+      if (null != cleanupTime && time == cleanupTime) {
+        clear(window, ctx)
+        return TriggerResult.FIRE_AND_PURGE
+      }
+    }
+    TriggerResult.CONTINUE
+  }
+
+  override def onEventTime(time: Long, window: GlobalWindow, ctx: TriggerContext): TriggerResult = {
+    TriggerResult.CONTINUE
+  }
+
+  override def clear(window: GlobalWindow, ctx: TriggerContext): Unit = {
+    ctx.getPartitionedState(stateDesc).clear()
+    ctx.getPartitionedState(cleanupStateDesc).clear()
+  }
+
+}
+
+object StateCleaningCountTrigger {
+
+  /**
+    * Create a [[StateCleaningCountTrigger]] instance.
+    *
+    * @param queryConfig query configuration.
+    * @param maxCount The count of elements at which to fire.
+    */
+  def of(queryConfig: StreamQueryConfig, maxCount: Long): StateCleaningCountTrigger =
+    new StateCleaningCountTrigger(queryConfig, maxCount)
+
+  class Sum extends ReduceFunction[JLong] {
+    override def reduce(value1: JLong, value2: JLong): JLong = value1 + value2
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 846fe3e..81d3577 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -18,13 +18,14 @@
 
 package org.apache.flink.table.api.scala.stream.table
 
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
 import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset
@@ -41,7 +42,8 @@ import scala.collection.mutable
   * programs is possible.
   */
 class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
-
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
   val data = List(
     (1L, 1, "Hi"),
     (2L, 2, "Hello"),
@@ -68,7 +70,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       .select('string, countFun('int), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toDataStream[Row](queryConfig)
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -136,7 +138,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       .select(countFun('string), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toDataStream[Row](queryConfig)
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
new file mode 100644
index 0000000..96601fb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class StateCleaningCountTriggerHarnessTest {
+  protected var queryConfig =
+    new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+
+  @Test
+  def testFiringAndFiringWithPurging(): Unit = {
+    val testHarness = new TriggerTestHarness[Any, GlobalWindow](
+      StateCleaningCountTrigger.of(queryConfig, 10), new GlobalWindow.Serializer)
+
+    // try to trigger onProcessingTime method via 1, but there is non timer is triggered
+    assertEquals(0, testHarness.advanceProcessingTime(1).size())
+
+    // register cleanup timer with 3001
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+    // try to trigger onProcessingTime method via 1000, but there is non timer is triggered
+    assertEquals(0, testHarness.advanceProcessingTime(1000).size())
+
+    // 1000 + 2000 <= 3001 reuse timer 3001
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+    // there are two state entries, one is timer(3001) another is counter(2)
+    assertEquals(2, testHarness.numStateEntries)
+
+    // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered
+    assertEquals(
+      TriggerResult.FIRE_AND_PURGE,
+      testHarness.advanceProcessingTime(3001).iterator().next().f1)
+
+    assertEquals(0, testHarness.numStateEntries)
+
+    // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+    // try to trigger onProcessingTime method via 4002, but there is non timer is triggered
+    assertEquals(0, testHarness.advanceProcessingTime(4002).size())
+
+    // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+    // 4002 + 2000 <= 7002 reuse timer 7002
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+    // have one timer 7002
+    assertEquals(1, testHarness.numProcessingTimeTimers)
+    assertEquals(0, testHarness.numEventTimeTimers)
+    assertEquals(2, testHarness.numStateEntries)
+    assertEquals(2, testHarness.numStateEntries(GlobalWindow.get))
+
+    // 4002 + 2000 <= 7002 reuse timer 7002
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+    assertEquals(
+      TriggerResult.FIRE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+
+    // counter of window() is cleared
+    assertEquals(1, testHarness.numStateEntries)
+    assertEquals(1, testHarness.numStateEntries(GlobalWindow.get))
+
+    // try to trigger onProcessingTime method via 7002, and all states are cleared
+    assertEquals(
+      TriggerResult.FIRE_AND_PURGE,
+      testHarness.advanceProcessingTime(7002).iterator().next().f1)
+
+    assertEquals(0, testHarness.numStateEntries)
+  }
+
+  /**
+    * Verify that clear() does not leak across windows.
+    */
+  @Test
+  def testClear() {
+    val testHarness = new TriggerTestHarness[Any, GlobalWindow](
+      StateCleaningCountTrigger.of(queryConfig, 3),
+      new GlobalWindow.Serializer)
+    assertEquals(
+      TriggerResult.CONTINUE,
+      testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
+    // have 1 timers
+    assertEquals(1, testHarness.numProcessingTimeTimers)
+    assertEquals(0, testHarness.numEventTimeTimers)
+    assertEquals(2, testHarness.numStateEntries)
+    assertEquals(2, testHarness.numStateEntries(GlobalWindow.get))
+
+    testHarness.clearTriggerState(GlobalWindow.get)
+
+    assertEquals(0, testHarness.numStateEntries)
+    assertEquals(0, testHarness.numStateEntries(GlobalWindow.get))
+  }
+}