You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/07/27 13:07:48 UTC
[flink] branch master updated: [FLINK-18699][table-api-scala] Allow
selecting fields without string interpolation in Scala
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 367ad8b [FLINK-18699][table-api-scala] Allow selecting fields without string interpolation in Scala
367ad8b is described below
commit 367ad8b8a00c3324fed342e8924e8df65438b06d
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jul 24 09:21:27 2020 +0200
[FLINK-18699][table-api-scala] Allow selecting fields without string interpolation in Scala
This closes #12978.
---
.../flink/table/examples/scala/StreamSQLExample.scala | 4 ++--
.../apache/flink/table/examples/scala/WordCountTable.scala | 6 +++---
.../flink/table/api/ImplicitExpressionConversions.scala | 14 ++++++++++++++
.../flink/table/api/ExpressionsConsistencyCheckTest.scala | 11 +++++++----
4 files changed, 26 insertions(+), 9 deletions(-)
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index f979514..6035347 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -77,9 +77,9 @@ object StreamSQLExample {
Order(4L, "beer", 1)))
// convert DataStream to Table
- val tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
+ val tableA = tEnv.fromDataStream(orderA, $"user", $"product", $"amount")
// register DataStream as Table
- tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount)
+ tEnv.createTemporaryView("OrderB", orderB, $"user", $"product", $"amount")
// union the two tables
val result = tEnv.sqlQuery(
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
index a869d4d..7f667ce 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
@@ -45,9 +45,9 @@ object WordCountTable {
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
val expr = input.toTable(tEnv)
val result = expr
- .groupBy('word)
- .select('word, 'frequency.sum as 'frequency)
- .filter('frequency === 2)
+ .groupBy($"word")
+ .select($"word", $"frequency".sum as "frequency")
+ .filter($"frequency" === 2)
.toDataSet[WC]
result.print()
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index 540d233..23e45a3 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -364,6 +364,20 @@ trait ImplicitExpressionConversions {
// ----------------------------------------------------------------------------------------------
/**
+ * Creates an unresolved reference to a table's field.
+ *
+ * For example:
+ *
+ * ```
+ * tab.select($("key"), $("value"))
+ * ```
+ *
+ * This method is useful in cases where the field name is calculated and the recommended way of
+ * using string interpolation like `$"key"` would be inconvenient.
+ */
+ def $(name: String): Expression = Expressions.$(name)
+
+ /**
* Creates a SQL literal.
*
* The data type is derived from the object's class and its value.
diff --git a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/ExpressionsConsistencyCheckTest.scala b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/ExpressionsConsistencyCheckTest.scala
index c3699ad..e9ef414 100644
--- a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/ExpressionsConsistencyCheckTest.scala
+++ b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/api/ExpressionsConsistencyCheckTest.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.api
-import org.apache.flink.table.api.Expressions._
import org.apache.flink.table.expressions.ApiExpressionUtils._
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS, PLUS, TRIM}
@@ -253,7 +252,7 @@ class ExpressionsConsistencyCheckTest {
def testInteroperability(): Unit = {
// In most cases it should be just fine to mix the two APIs.
// It should be discouraged though as it might have unforeseen side effects
- val expr = lit("ABC") === $"f0".plus($("f1")).trim()
+ val expr = lit("ABC") === $"f0".plus(Expressions.$("f1")).plus($("f2")).trim()
assertThat(
expr,
@@ -268,8 +267,12 @@ class ExpressionsConsistencyCheckTest {
valueLiteral(" "),
unresolvedCall(
PLUS,
- unresolvedRef("f0"),
- unresolvedRef("f1")
+ unresolvedCall(
+ PLUS,
+ unresolvedRef("f0"),
+ unresolvedRef("f1")
+ ),
+ unresolvedRef("f2")
)
)
)