You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/01/09 15:48:37 UTC
[flink] 04/04: [FLINK-10583][table] Add State Retention support to
TemporalProcessTimeJoin.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 52ed85ae03aa4bd09c837093c56e489739c6a543
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 26 17:27:24 2018 +0100
[FLINK-10583][table] Add State Retention support to TemporalProcessTimeJoin.
This closes #6871.
---
.../runtime/join/TemporalProcessTimeJoin.scala | 26 +++++++-
.../runtime/harness/TemporalJoinHarnessTest.scala | 78 +++++++++++++++++++++-
2 files changed, 100 insertions(+), 4 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
index 2d9fdf3..ffda1e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
@@ -22,7 +22,8 @@ import org.apache.flink.api.common.functions.util.FunctionUtils
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, TimestampedCollector, TwoInputStreamOperator}
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.{InternalTimer, TimestampedCollector}
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.Compiler
@@ -38,8 +39,7 @@ class TemporalProcessTimeJoin(
genJoinFuncName: String,
genJoinFuncCode: String,
queryConfig: StreamQueryConfig)
- extends AbstractStreamOperator[CRow]
- with TwoInputStreamOperator[CRow, CRow, CRow]
+ extends BaseTwoInputStreamOperatorWithStateRetention(queryConfig)
with Compiler[FlatJoinFunction[Row, Row, Row]]
with Logging {
@@ -70,6 +70,8 @@ class TemporalProcessTimeJoin(
collector = new TimestampedCollector[CRow](output)
cRowWrapper = new CRowWrappingCollector()
cRowWrapper.out = collector
+
+ super.open()
}
override def processElement1(element: StreamRecord[CRow]): Unit = {
@@ -82,18 +84,36 @@ class TemporalProcessTimeJoin(
cRowWrapper.setChange(element.getValue.change)
joinFunction.join(element.getValue.row, rightSideRow, cRowWrapper)
+
+ registerProcessingCleanUpTimer()
}
override def processElement2(element: StreamRecord[CRow]): Unit = {
if (element.getValue.change) {
rightState.update(element.getValue.row)
+ registerProcessingCleanUpTimer()
} else {
rightState.clear()
+ cleanUpLastTimer()
}
}
override def close(): Unit = {
FunctionUtils.closeFunction(joinFunction)
}
+
+ /**
+ * The method to be called when a cleanup timer fires.
+ *
+ * @param time The timestamp of the fired timer.
+ */
+ override def cleanUpState(time: Long): Unit = {
+ rightState.clear()
+ }
+
+ /**
+ * Invoked when an event-time timer fires.
+ */
+ override def onEventTime(timer: InternalTimer[Any, VoidNamespace]): Unit = ???
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
index 7b942c2..94c22e6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
@@ -42,7 +42,7 @@ import org.apache.flink.table.runtime.CRowKeySelector
import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TestStreamQueryConfig}
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-import org.hamcrest.{CoreMatchers, Matcher}
+import org.hamcrest.CoreMatchers
import org.hamcrest.Matchers.{endsWith, startsWith}
import org.junit.Assert.assertTrue
import org.junit.Test
@@ -658,6 +658,82 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
testHarness.close()
}
+ // ---------------------- Processing time TTL tests ----------------------
+
+ @Test
+ def testProcessingTimeJoinCleanupTimerUpdatedFromProbeSide(): Unit = {
+ // min=2ms max=4ms
+ val testHarness = createTestHarness(new OrdersRatesProctimeTemporalJoinInfo())
+
+ testHarness.open()
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ testHarness.setProcessingTime(1L)
+
+ testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+
+ expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))
+
+ // this should push further the clean-up the state
+ testHarness.setProcessingTime(4L)
+
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 6L)))
+ expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 6L, "Euro", 114L, 0L)))
+
+ // this should do nothing
+ testHarness.setProcessingTime(5L)
+
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 8L)))
+ expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 8L, "Euro", 114L, 0L)))
+
+ // this should clean up the state
+ testHarness.setProcessingTime(8L)
+
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 10L)))
+
+ verify(expectedOutput, testHarness.getOutput)
+
+ testHarness.close()
+ }
+
+ @Test
+ def testProcessingTimeJoinCleanupTimerUpdatedFromBuildSide(): Unit = {
+ // min=2ms max=4ms
+ val testHarness = createTestHarness(new OrdersRatesProctimeTemporalJoinInfo())
+
+ testHarness.open()
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ testHarness.setProcessingTime(1L)
+
+ testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+
+ expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))
+
+ // this should push further the clean-up the state
+ testHarness.setProcessingTime(4L)
+
+ testHarness.processElement2(new StreamRecord(CRow("Euro", 116L, 1L)))
+
+ // this should do nothing
+ testHarness.setProcessingTime(5L)
+
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 6L)))
+ expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 6L, "Euro", 116L, 1L)))
+
+ // this should clean up the state
+ testHarness.setProcessingTime(8L)
+
+ // so this should find no match
+ testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 10L)))
+
+ verify(expectedOutput, testHarness.getOutput)
+
+ testHarness.close()
+ }
+
def translateJoin(joinInfo: TemporalJoinInfo, joinRelType: JoinRelType = JoinRelType.INNER)
: (CRowKeySelector, CRowKeySelector, TwoInputStreamOperator[CRow, CRow, CRow]) = {