You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/07/27 13:07:48 UTC

[flink] branch master updated: [FLINK-18699][table-api-scala] Allow selecting fields without string interpolation in Scala

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 367ad8b  [FLINK-18699][table-api-scala] Allow selecting fields without string interpolation in Scala
367ad8b is described below

commit 367ad8b8a00c3324fed342e8924e8df65438b06d
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jul 24 09:21:27 2020 +0200

    [FLINK-18699][table-api-scala] Allow selecting fields without string interpolation in Scala
    
    This closes #12978.
---
 .../flink/table/examples/scala/StreamSQLExample.scala      |  4 ++--
 .../apache/flink/table/examples/scala/WordCountTable.scala |  6 +++---
 .../flink/table/api/ImplicitExpressionConversions.scala    | 14 ++++++++++++++
 .../flink/table/api/ExpressionsConsistencyCheckTest.scala  | 11 +++++++----
 4 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index f979514..6035347 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -77,9 +77,9 @@ object StreamSQLExample {
       Order(4L, "beer", 1)))
 
     // convert DataStream to Table
-    val tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
+    val tableA = tEnv.fromDataStream(orderA, $"user", $"product", $"amount")
     // register DataStream as Table
-    tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount)
+    tEnv.createTemporaryView("OrderB", orderB, $"user", $"product", $"amount")
 
     // union the two tables
     val result = tEnv.sqlQuery(
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
index a869d4d..7f667ce 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
@@ -45,9 +45,9 @@ object WordCountTable {
     val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
     val expr = input.toTable(tEnv)
     val result = expr
-      .groupBy('word)
-      .select('word, 'frequency.sum as 'frequency)
-      .filter('frequency === 2)
+      .groupBy($"word")
+      .select($"word", $"frequency".sum as "frequency")
+      .filter($"frequency" === 2)
       .toDataSet[WC]
 
     result.print()
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index 540d233..23e45a3 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -364,6 +364,20 @@ trait ImplicitExpressionConversions {
   // ----------------------------------------------------------------------------------------------
 
   /**
+   * Creates an unresolved reference to a table's field.
+   *
+   * For example:
+   *
+   * ```
+   * tab.select($("key"), $("value"))
+   * ```
+   *
+   * This method is useful in cases where the field name is calculated and the recommended way of
+   * using string interpolation like `$"key"` would be inconvenient.
+   */
+  def $(name: String): Expression = Expressions.$(name)
+
+  /**
    * Creates a SQL literal.
    *
    * The data type is derived from the object's class and its value.
diff --git a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/ExpressionsConsistencyCheckTest.scala b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/ExpressionsConsistencyCheckTest.scala
index c3699ad..e9ef414 100644
--- a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/ExpressionsConsistencyCheckTest.scala
+++ b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/ExpressionsConsistencyCheckTest.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.api
 
-import org.apache.flink.table.api.Expressions._
 import org.apache.flink.table.expressions.ApiExpressionUtils._
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS, PLUS, TRIM}
@@ -253,7 +252,7 @@ class ExpressionsConsistencyCheckTest {
   def testInteroperability(): Unit = {
     // In most cases it should be just fine to mix the two APIs.
     // It should be discouraged though as it might have unforeseen side effects
-    val expr = lit("ABC") === $"f0".plus($("f1")).trim()
+    val expr = lit("ABC") === $"f0".plus(Expressions.$("f1")).plus($("f2")).trim()
 
     assertThat(
       expr,
@@ -268,8 +267,12 @@ class ExpressionsConsistencyCheckTest {
             valueLiteral(" "),
             unresolvedCall(
               PLUS,
-              unresolvedRef("f0"),
-              unresolvedRef("f1")
+              unresolvedCall(
+              PLUS,
+                unresolvedRef("f0"),
+                unresolvedRef("f1")
+              ),
+              unresolvedRef("f2")
             )
           )
         )