You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/06 09:34:43 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #10763: [FLINK-14200][table] Fix NPE for Temporal Table Function Join when left side is a query instead of a source

dawidwys commented on a change in pull request #10763: [FLINK-14200][table] Fix NPE for Temporal Table Function Join when left side is a query instead of a source
URL: https://github.com/apache/flink/pull/10763#discussion_r363209651
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
 ##########
 @@ -158,11 +160,87 @@ class TemporalJoinITCase(state: StateBackendMode)
 
     assertEquals(expectedOutput, sink.getAppendResults.toSet)
   }
+
+  @Test
+  def testNestedTemporalJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val sqlQuery =
+      """
+        |SELECT
+        |  o.amount, r.rate, p.price
+        |FROM
+        |  Orders AS o,
+        |  LATERAL TABLE (Rates(o.rowtime)) AS r,
+        |  LATERAL TABLE (Prices(o.rowtime)) AS p
+        |WHERE r.currency = o.currency AND p.productId = o.productId
+        |""".stripMargin
+
+    val ordersData = new mutable.MutableList[(Long, String, String, Timestamp)]
+    ordersData.+=((2L, "A1", "Euro", new Timestamp(2L)))
+    ordersData.+=((1L, "A2", "US Dollar", new Timestamp(3L)))
+    ordersData.+=((50L, "A4", "Yen", new Timestamp(4L)))
+    ordersData.+=((3L, "A2", "Euro", new Timestamp(5L)))
+
+    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
+    ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
+    ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))
+
+    val pricesHistoryData = new mutable.MutableList[(String, Double, Timestamp)]
+    pricesHistoryData.+=(("A2", 10.2D, new Timestamp(1L)))
+    pricesHistoryData.+=(("A1", 11.4D, new Timestamp(1L)))
+    pricesHistoryData.+=(("A4", 1D, new Timestamp(1L)))
+    pricesHistoryData.+=(("A1", 11.6D, new Timestamp(5L)))
+    pricesHistoryData.+=(("A1", 11.9D, new Timestamp(7L)))
+
+    val orders = env
+      .fromCollection(ordersData)
+      .asInstanceOf[DataStream[Product]]
 
 Review comment:
   How about fixing the generic type of `TimestampExtractor` instead?
   
   ```
         .fromCollection(ordersData)
         .assignTimestampsAndWatermarks(new TimestampExtractor[(Long, String, String, Timestamp)]())
   
   
   class TimestampExtractor[T <: Product]
     extends BoundedOutOfOrdernessTimestampExtractor[T]
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services