You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/03/23 08:39:25 UTC

flink git commit: [FLINK-8919] [table] Add KeyedProcessFunctionWithCleanupState.

Repository: flink
Updated Branches:
  refs/heads/master 9198c93e5 -> cc94090ea


[FLINK-8919] [table] Add KeyedProcessFunctionWithCleanupState.

This closes #5680.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc94090e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc94090e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc94090e

Branch: refs/heads/master
Commit: cc94090ea29b5f33a0a110d7af46dfc2c7e8610a
Parents: 9198c93
Author: liurenjie1024 <li...@gmail.com>
Authored: Mon Mar 12 15:43:26 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 23 09:37:57 2018 +0100

----------------------------------------------------------------------
 .../KeyedProcessFunctionWithCleanupState.scala  |  85 ++++++++++++
 ...yedProcessFunctionWithCleanupStateTest.scala | 126 ++++++++++++++++++
 .../ProcessFunctionWithCleanupStateTest.scala   | 131 +++++++++++++++++++
 3 files changed, 342 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc94090e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
new file mode 100644
index 0000000..4d6840a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+import org.apache.flink.api.common.state.{State, ValueState, ValueStateDescriptor}
+import org.apache.flink.streaming.api.TimeDomain
+import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: StreamQueryConfig)
+  extends KeyedProcessFunction[K, I, O] {
+  protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // holds the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected def initCleanupTimeState(stateName: String) {
+    if (stateCleaningEnabled) {
+      val inputCntDescriptor: ValueStateDescriptor[JLong] =
+        new ValueStateDescriptor[JLong](stateName, Types.LONG)
+      cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+    }
+  }
+
+  protected def registerProcessingCleanupTimer(
+    ctx: KeyedProcessFunction[K, I, O]#Context,
+    currentTime: Long): Unit = {
+    if (stateCleaningEnabled) {
+
+      // last registered timer
+      val curCleanupTime = cleanupTimeState.value()
+
+      // check if a cleanup timer is registered and
+      // that the current cleanup timer won't delete state we need to keep
+      if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
+        // we need to register a new (later) timer
+        val cleanupTime = currentTime + maxRetentionTime
+        // register timer and remember clean-up time
+        ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+        cleanupTimeState.update(cleanupTime)
+      }
+    }
+  }
+
+  protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
+    ctx.timeDomain() == TimeDomain.PROCESSING_TIME
+  }
+
+  protected def needToCleanupState(timestamp: Long): Boolean = {
+    if (stateCleaningEnabled) {
+      val cleanupTime = cleanupTimeState.value()
+      // check that the triggered timer is the last registered processing time timer.
+      null != cleanupTime && timestamp == cleanupTime
+    } else {
+      false
+    }
+  }
+
+  protected def cleanupState(states: State*): Unit = {
+    // clear all state
+    states.foreach(_.clear())
+    if (stateCleaningEnabled) {
+      this.cleanupTimeState.clear()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cc94090e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
new file mode 100644
index 0000000..c896666
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.util.Collector
+
+import org.junit.Test
+import org.junit.Assert.assertEquals
+
+class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase {
+
+  @Test
+  def testStateCleaning(): Unit = {
+    val queryConfig = new StreamQueryConfig()
+      .withIdleStateRetentionTime(Time.milliseconds(5), Time.milliseconds(10))
+
+    val func = new MockedKeyedProcessFunction(queryConfig)
+    val operator = new KeyedProcessOperator(func)
+
+    val testHarness = createHarnessTester(operator,
+      new FirstFieldSelector,
+      TypeInformation.of(classOf[String]))
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(1)
+    // add state for key "a"
+    testHarness.processElement(("a", "payload"), 1)
+    // add state for key "b"
+    testHarness.processElement(("b", "payload"), 1)
+
+    // check that we have two states (a, b)
+    // we check for the double number of states, because KeyedProcessFunctionWithCleanupState
+    //   adds one more state per key to hold the cleanup timestamp.
+    assertEquals(4, testHarness.numKeyedStateEntries())
+
+    // advance time and add state for key "c"
+    testHarness.setProcessingTime(5)
+    testHarness.processElement(("c", "payload"), 1)
+    // add state for key "a". Timer is not reset, because it is still within minRetentionTime
+    testHarness.processElement(("a", "payload"), 1)
+
+    // check that we have three states (a, b, c)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+
+    // advance time and update key "b". Timer for "b" is reset to 18
+    testHarness.setProcessingTime(8)
+    testHarness.processElement(("b", "payload"), 1)
+    // check that we have three states (a, b, c)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+
+    // advance time to clear state for key "a"
+    testHarness.setProcessingTime(11)
+    // check that we have two states (b, c)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+
+    // advance time to clear state for key "c"
+    testHarness.setProcessingTime(15)
+    // check that we have one state (b)
+    assertEquals(2, testHarness.numKeyedStateEntries())
+
+    // advance time to clear state for key "c"
+    testHarness.setProcessingTime(18)
+    // check that we have no states
+    assertEquals(0, testHarness.numKeyedStateEntries())
+
+    testHarness.close()
+  }
+}
+
+private class MockedKeyedProcessFunction(queryConfig: StreamQueryConfig)
+    extends KeyedProcessFunctionWithCleanupState[String, (String, String), String](queryConfig) {
+
+  var state: ValueState[String] = _
+
+  override def open(parameters: Configuration): Unit = {
+    initCleanupTimeState("CleanUpState")
+    val stateDesc = new ValueStateDescriptor[String]("testState", classOf[String])
+    state = getRuntimeContext.getState(stateDesc)
+  }
+
+  override def processElement(
+      value: (String, String),
+      ctx: KeyedProcessFunction[String, (String, String), String]#Context,
+      out: Collector[String]): Unit = {
+
+    val curTime = ctx.timerService().currentProcessingTime()
+    registerProcessingCleanupTimer(ctx, curTime)
+    state.update(value._2)
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: KeyedProcessFunction[String, (String, String), String]#OnTimerContext,
+      out: Collector[String]): Unit = {
+
+    if (needToCleanupState(timestamp)) {
+      cleanupState(state)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cc94090e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
new file mode 100644
index 0000000..e773f4b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.util.Collector
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ProcessFunctionWithCleanupStateTest extends HarnessTestBase {
+
+  @Test
+  def testStateCleaning(): Unit = {
+    val queryConfig = new StreamQueryConfig()
+      .withIdleStateRetentionTime(Time.milliseconds(5), Time.milliseconds(10))
+
+    val func = new MockedProcessFunction(queryConfig)
+    val operator = new LegacyKeyedProcessOperator(func)
+
+    val testHarness = createHarnessTester(operator,
+      new FirstFieldSelector,
+      TypeInformation.of(classOf[String]))
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(1)
+    // add state for key "a"
+    testHarness.processElement(("a", "payload"), 1)
+    // add state for key "b"
+    testHarness.processElement(("b", "payload"), 1)
+
+    // check that we have two states (a, b)
+    // we check for the double number of states, because KeyedProcessFunctionWithCleanupState
+    //   adds one more state per key to hold the cleanup timestamp.
+    assertEquals(4, testHarness.numKeyedStateEntries())
+
+    // advance time and add state for key "c"
+    testHarness.setProcessingTime(5)
+    testHarness.processElement(("c", "payload"), 1)
+    // add state for key "a". Timer is not reset, because it is still within minRetentionTime
+    testHarness.processElement(("a", "payload"), 1)
+
+    // check that we have three states (a, b, c)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+
+    // advance time and update key "b". Timer for "b" is reset to 18
+    testHarness.setProcessingTime(8)
+    testHarness.processElement(("b", "payload"), 1)
+    // check that we have three states (a, b, c)
+    assertEquals(6, testHarness.numKeyedStateEntries())
+
+    // advance time to clear state for key "a"
+    testHarness.setProcessingTime(11)
+    // check that we have two states (b, c)
+    assertEquals(4, testHarness.numKeyedStateEntries())
+
+    // advance time to clear state for key "c"
+    testHarness.setProcessingTime(15)
+    // check that we have one state (b)
+    assertEquals(2, testHarness.numKeyedStateEntries())
+
+    // advance time to clear state for key "c"
+    testHarness.setProcessingTime(18)
+    // check that we have no states
+    assertEquals(0, testHarness.numKeyedStateEntries())
+
+    testHarness.close()
+  }
+}
+
+private class MockedProcessFunction(queryConfig: StreamQueryConfig)
+    extends ProcessFunctionWithCleanupState[(String, String), String](queryConfig) {
+
+  var state: ValueState[String] = _
+
+  override def open(parameters: Configuration): Unit = {
+    initCleanupTimeState("CleanUpState")
+    val stateDesc = new ValueStateDescriptor[String]("testState", classOf[String])
+    state = getRuntimeContext.getState(stateDesc)
+  }
+
+  override def processElement(
+      value: (String, String),
+      ctx: ProcessFunction[(String, String), String]#Context,
+      out: Collector[String]): Unit = {
+
+    val curTime = ctx.timerService().currentProcessingTime()
+    registerProcessingCleanupTimer(ctx, curTime)
+    state.update(value._2)
+  }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: ProcessFunction[(String, String), String]#OnTimerContext,
+      out: Collector[String]): Unit = {
+
+    if (needToCleanupState(timestamp)) {
+      cleanupState(state)
+    }
+  }
+}
+
+private class FirstFieldSelector extends KeySelector[(String, String), String] {
+  override def getKey(value: (String, String)): String = value._1
+}
+