You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/01/09 15:48:37 UTC

[flink] 04/04: [FLINK-10583][table] Add State Retention support to TemporalProcessTimeJoin.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52ed85ae03aa4bd09c837093c56e489739c6a543
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 26 17:27:24 2018 +0100

    [FLINK-10583][table] Add State Retention support to TemporalProcessTimeJoin.
    
    This closes #6871.
---
 .../runtime/join/TemporalProcessTimeJoin.scala     | 26 +++++++-
 .../runtime/harness/TemporalJoinHarnessTest.scala  | 78 +++++++++++++++++++++-
 2 files changed, 100 insertions(+), 4 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
index 2d9fdf3..ffda1e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
@@ -22,7 +22,8 @@ import org.apache.flink.api.common.functions.util.FunctionUtils
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, TimestampedCollector, TwoInputStreamOperator}
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.{InternalTimer, TimestampedCollector}
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.Compiler
@@ -38,8 +39,7 @@ class TemporalProcessTimeJoin(
     genJoinFuncName: String,
     genJoinFuncCode: String,
     queryConfig: StreamQueryConfig)
-  extends AbstractStreamOperator[CRow]
-  with TwoInputStreamOperator[CRow, CRow, CRow]
+  extends BaseTwoInputStreamOperatorWithStateRetention(queryConfig)
   with Compiler[FlatJoinFunction[Row, Row, Row]]
   with Logging {
 
@@ -70,6 +70,8 @@ class TemporalProcessTimeJoin(
     collector = new TimestampedCollector[CRow](output)
     cRowWrapper = new CRowWrappingCollector()
     cRowWrapper.out = collector
+
+    super.open()
   }
 
   override def processElement1(element: StreamRecord[CRow]): Unit = {
@@ -82,18 +84,36 @@ class TemporalProcessTimeJoin(
     cRowWrapper.setChange(element.getValue.change)
 
     joinFunction.join(element.getValue.row, rightSideRow, cRowWrapper)
+
+    registerProcessingCleanUpTimer()
   }
 
   override def processElement2(element: StreamRecord[CRow]): Unit = {
 
     if (element.getValue.change) {
       rightState.update(element.getValue.row)
+      registerProcessingCleanUpTimer()
     } else {
       rightState.clear()
+      cleanUpLastTimer()
     }
   }
 
   override def close(): Unit = {
     FunctionUtils.closeFunction(joinFunction)
   }
+
+  /**
+    * The method to be called when a cleanup timer fires.
+    *
+    * @param time The timestamp of the fired timer.
+    */
+  override def cleanUpState(time: Long): Unit = {
+    rightState.clear()
+  }
+
+  /**
+    * Invoked when an event-time timer fires.
+    */
+  override def onEventTime(timer: InternalTimer[Any, VoidNamespace]): Unit = ???
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
index 7b942c2..94c22e6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
@@ -42,7 +42,7 @@ import org.apache.flink.table.runtime.CRowKeySelector
 import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TestStreamQueryConfig}
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-import org.hamcrest.{CoreMatchers, Matcher}
+import org.hamcrest.CoreMatchers
 import org.hamcrest.Matchers.{endsWith, startsWith}
 import org.junit.Assert.assertTrue
 import org.junit.Test
@@ -658,6 +658,82 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
     testHarness.close()
   }
 
+  // ---------------------- Processing time TTL tests ----------------------
+
+  @Test
+  def testProcessingTimeJoinCleanupTimerUpdatedFromProbeSide(): Unit = {
+    // min=2ms max=4ms
+    val testHarness = createTestHarness(new OrdersRatesProctimeTemporalJoinInfo())
+
+    testHarness.open()
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    testHarness.setProcessingTime(1L)
+
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+
+    expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))
+
+    // this should push further the clean-up the state
+    testHarness.setProcessingTime(4L)
+
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 6L)))
+    expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 6L, "Euro", 114L, 0L)))
+
+    // this should do nothing
+    testHarness.setProcessingTime(5L)
+
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 8L)))
+    expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 8L, "Euro", 114L, 0L)))
+
+    // this should clean up the state
+    testHarness.setProcessingTime(8L)
+
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 10L)))
+
+    verify(expectedOutput, testHarness.getOutput)
+
+    testHarness.close()
+  }
+
+  @Test
+  def testProcessingTimeJoinCleanupTimerUpdatedFromBuildSide(): Unit = {
+    // min=2ms max=4ms
+    val testHarness = createTestHarness(new OrdersRatesProctimeTemporalJoinInfo())
+
+    testHarness.open()
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    testHarness.setProcessingTime(1L)
+
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+
+    expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L)))
+
+    // this should push further the clean-up the state
+    testHarness.setProcessingTime(4L)
+
+    testHarness.processElement2(new StreamRecord(CRow("Euro", 116L, 1L)))
+
+    // this should do nothing
+    testHarness.setProcessingTime(5L)
+
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 6L)))
+    expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 6L, "Euro", 116L, 1L)))
+
+    // this should clean up the state
+    testHarness.setProcessingTime(8L)
+
+    // so this should find no match
+    testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 10L)))
+
+    verify(expectedOutput, testHarness.getOutput)
+
+    testHarness.close()
+  }
+
   def translateJoin(joinInfo: TemporalJoinInfo, joinRelType: JoinRelType = JoinRelType.INNER)
     : (CRowKeySelector, CRowKeySelector, TwoInputStreamOperator[CRow, CRow, CRow]) = {