You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/03/23 08:39:25 UTC
flink git commit: [FLINK-8919] [table] Add
KeyedProcessFunctionWithCleanupState.
Repository: flink
Updated Branches:
refs/heads/master 9198c93e5 -> cc94090ea
[FLINK-8919] [table] Add KeyedProcessFunctionWithCleanupState.
This closes #5680.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc94090e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc94090e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc94090e
Branch: refs/heads/master
Commit: cc94090ea29b5f33a0a110d7af46dfc2c7e8610a
Parents: 9198c93
Author: liurenjie1024 <li...@gmail.com>
Authored: Mon Mar 12 15:43:26 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 23 09:37:57 2018 +0100
----------------------------------------------------------------------
.../KeyedProcessFunctionWithCleanupState.scala | 85 ++++++++++++
...yedProcessFunctionWithCleanupStateTest.scala | 126 ++++++++++++++++++
.../ProcessFunctionWithCleanupStateTest.scala | 131 +++++++++++++++++++
3 files changed, 342 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cc94090e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
new file mode 100644
index 0000000..4d6840a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+import org.apache.flink.api.common.state.{State, ValueState, ValueStateDescriptor}
+import org.apache.flink.streaming.api.TimeDomain
+import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: StreamQueryConfig)
+ extends KeyedProcessFunction[K, I, O] {
+ protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+ protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+ protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+ // holds the latest registered cleanup timer
+ private var cleanupTimeState: ValueState[JLong] = _
+
+ protected def initCleanupTimeState(stateName: String) {
+ if (stateCleaningEnabled) {
+ val inputCntDescriptor: ValueStateDescriptor[JLong] =
+ new ValueStateDescriptor[JLong](stateName, Types.LONG)
+ cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+ }
+ }
+
+ protected def registerProcessingCleanupTimer(
+ ctx: KeyedProcessFunction[K, I, O]#Context,
+ currentTime: Long): Unit = {
+ if (stateCleaningEnabled) {
+
+ // last registered timer
+ val curCleanupTime = cleanupTimeState.value()
+
+ // check if a cleanup timer is registered and
+ // that the current cleanup timer won't delete state we need to keep
+ if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
+ // we need to register a new (later) timer
+ val cleanupTime = currentTime + maxRetentionTime
+ // register timer and remember clean-up time
+ ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+ cleanupTimeState.update(cleanupTime)
+ }
+ }
+ }
+
+ protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
+ ctx.timeDomain() == TimeDomain.PROCESSING_TIME
+ }
+
+ protected def needToCleanupState(timestamp: Long): Boolean = {
+ if (stateCleaningEnabled) {
+ val cleanupTime = cleanupTimeState.value()
+ // check that the triggered timer is the last registered processing time timer.
+ null != cleanupTime && timestamp == cleanupTime
+ } else {
+ false
+ }
+ }
+
+ protected def cleanupState(states: State*): Unit = {
+ // clear all state
+ states.foreach(_.clear())
+ if (stateCleaningEnabled) {
+ this.cleanupTimeState.clear()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc94090e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
new file mode 100644
index 0000000..c896666
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.util.Collector
+
+import org.junit.Test
+import org.junit.Assert.assertEquals
+
+class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase {
+
+ @Test
+ def testStateCleaning(): Unit = {
+ val queryConfig = new StreamQueryConfig()
+ .withIdleStateRetentionTime(Time.milliseconds(5), Time.milliseconds(10))
+
+ val func = new MockedKeyedProcessFunction(queryConfig)
+ val operator = new KeyedProcessOperator(func)
+
+ val testHarness = createHarnessTester(operator,
+ new FirstFieldSelector,
+ TypeInformation.of(classOf[String]))
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(1)
+ // add state for key "a"
+ testHarness.processElement(("a", "payload"), 1)
+ // add state for key "b"
+ testHarness.processElement(("b", "payload"), 1)
+
+ // check that we have two states (a, b)
+ // we check for the double number of states, because KeyedProcessFunctionWithCleanupState
+ // adds one more state per key to hold the cleanup timestamp.
+ assertEquals(4, testHarness.numKeyedStateEntries())
+
+ // advance time and add state for key "c"
+ testHarness.setProcessingTime(5)
+ testHarness.processElement(("c", "payload"), 1)
+ // add state for key "a". Timer is not reset, because it is still within minRetentionTime
+ testHarness.processElement(("a", "payload"), 1)
+
+ // check that we have three states (a, b, c)
+ assertEquals(6, testHarness.numKeyedStateEntries())
+
+ // advance time and update key "b". Timer for "b" is reset to 18
+ testHarness.setProcessingTime(8)
+ testHarness.processElement(("b", "payload"), 1)
+ // check that we have three states (a, b, c)
+ assertEquals(6, testHarness.numKeyedStateEntries())
+
+ // advance time to clear state for key "a"
+ testHarness.setProcessingTime(11)
+ // check that we have two states (b, c)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+
+ // advance time to clear state for key "c"
+ testHarness.setProcessingTime(15)
+ // check that we have one state (b)
+ assertEquals(2, testHarness.numKeyedStateEntries())
+
+ // advance time to clear state for key "c"
+ testHarness.setProcessingTime(18)
+ // check that we have no states
+ assertEquals(0, testHarness.numKeyedStateEntries())
+
+ testHarness.close()
+ }
+}
+
+private class MockedKeyedProcessFunction(queryConfig: StreamQueryConfig)
+ extends KeyedProcessFunctionWithCleanupState[String, (String, String), String](queryConfig) {
+
+ var state: ValueState[String] = _
+
+ override def open(parameters: Configuration): Unit = {
+ initCleanupTimeState("CleanUpState")
+ val stateDesc = new ValueStateDescriptor[String]("testState", classOf[String])
+ state = getRuntimeContext.getState(stateDesc)
+ }
+
+ override def processElement(
+ value: (String, String),
+ ctx: KeyedProcessFunction[String, (String, String), String]#Context,
+ out: Collector[String]): Unit = {
+
+ val curTime = ctx.timerService().currentProcessingTime()
+ registerProcessingCleanupTimer(ctx, curTime)
+ state.update(value._2)
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: KeyedProcessFunction[String, (String, String), String]#OnTimerContext,
+ out: Collector[String]): Unit = {
+
+ if (needToCleanupState(timestamp)) {
+ cleanupState(state)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc94090e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
new file mode 100644
index 0000000..e773f4b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.util.Collector
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ProcessFunctionWithCleanupStateTest extends HarnessTestBase {
+
+ @Test
+ def testStateCleaning(): Unit = {
+ val queryConfig = new StreamQueryConfig()
+ .withIdleStateRetentionTime(Time.milliseconds(5), Time.milliseconds(10))
+
+ val func = new MockedProcessFunction(queryConfig)
+ val operator = new LegacyKeyedProcessOperator(func)
+
+ val testHarness = createHarnessTester(operator,
+ new FirstFieldSelector,
+ TypeInformation.of(classOf[String]))
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(1)
+ // add state for key "a"
+ testHarness.processElement(("a", "payload"), 1)
+ // add state for key "b"
+ testHarness.processElement(("b", "payload"), 1)
+
+ // check that we have two states (a, b)
+ // we check for the double number of states, because KeyedProcessFunctionWithCleanupState
+ // adds one more state per key to hold the cleanup timestamp.
+ assertEquals(4, testHarness.numKeyedStateEntries())
+
+ // advance time and add state for key "c"
+ testHarness.setProcessingTime(5)
+ testHarness.processElement(("c", "payload"), 1)
+ // add state for key "a". Timer is not reset, because it is still within minRetentionTime
+ testHarness.processElement(("a", "payload"), 1)
+
+ // check that we have three states (a, b, c)
+ assertEquals(6, testHarness.numKeyedStateEntries())
+
+ // advance time and update key "b". Timer for "b" is reset to 18
+ testHarness.setProcessingTime(8)
+ testHarness.processElement(("b", "payload"), 1)
+ // check that we have three states (a, b, c)
+ assertEquals(6, testHarness.numKeyedStateEntries())
+
+ // advance time to clear state for key "a"
+ testHarness.setProcessingTime(11)
+ // check that we have two states (b, c)
+ assertEquals(4, testHarness.numKeyedStateEntries())
+
+ // advance time to clear state for key "c"
+ testHarness.setProcessingTime(15)
+ // check that we have one state (b)
+ assertEquals(2, testHarness.numKeyedStateEntries())
+
+ // advance time to clear state for key "c"
+ testHarness.setProcessingTime(18)
+ // check that we have no states
+ assertEquals(0, testHarness.numKeyedStateEntries())
+
+ testHarness.close()
+ }
+}
+
+private class MockedProcessFunction(queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[(String, String), String](queryConfig) {
+
+ var state: ValueState[String] = _
+
+ override def open(parameters: Configuration): Unit = {
+ initCleanupTimeState("CleanUpState")
+ val stateDesc = new ValueStateDescriptor[String]("testState", classOf[String])
+ state = getRuntimeContext.getState(stateDesc)
+ }
+
+ override def processElement(
+ value: (String, String),
+ ctx: ProcessFunction[(String, String), String]#Context,
+ out: Collector[String]): Unit = {
+
+ val curTime = ctx.timerService().currentProcessingTime()
+ registerProcessingCleanupTimer(ctx, curTime)
+ state.update(value._2)
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[(String, String), String]#OnTimerContext,
+ out: Collector[String]): Unit = {
+
+ if (needToCleanupState(timestamp)) {
+ cleanupState(state)
+ }
+ }
+}
+
+private class FirstFieldSelector extends KeySelector[(String, String), String] {
+ override def getKey(value: (String, String)): String = value._1
+}
+