You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 15:23:51 UTC
[flink] branch master updated: [FLINK-13045][table] Move Scala
expression DSL to flink-table-api-scala
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d903480 [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala
d903480 is described below
commit d903480ef7abc95cd419e6a194db2276dd7eb4cb
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Jul 2 16:55:08 2019 +0200
[FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala
This move the Scala expression DSL to flink-table-api-scala.
Users of pure table programs should define there imports like:
import org.apache.flink.table.api._
TableEnvironment.create(...)
Users of the DataStream API should define their imports like:
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
StreamTableEnvironment.create(...)
This commit did not split the package object org.apache.flink.table.api.scala._ into
two parts yet because we want to give users the chance to update their imports.
This closes #8945.
---
docs/dev/table/tableApi.md | 4 +
docs/dev/table/tableApi.zh.md | 4 +
flink-table/flink-table-api-scala-bridge/pom.xml | 9 +-
.../flink/table/api/scala/DataSetConversions.scala | 6 +-
.../table/api/scala/DataStreamConversions.scala | 6 +-
.../flink/table/api/scala/TableConversions.scala | 25 +-
.../org/apache/flink/table/api/scala/package.scala | 89 ++++++
flink-table/flink-table-api-scala/pom.xml | 47 +++
.../apache/flink/table/api}/expressionDsl.scala | 319 ++++++---------------
.../scala/org/apache/flink/table/api/package.scala | 48 ++++
.../scala/org/apache/flink/table/api/package.scala | 34 ---
.../org/apache/flink/table/api/scala/package.scala | 93 ------
12 files changed, 307 insertions(+), 377 deletions(-)
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a0bd51a..bd6bd38 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -44,6 +44,9 @@ The following example shows the differences between the Scala and Java Table API
The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings.
{% highlight java %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.java._
+
// environment configuration
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
@@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is constructed. Table
{% highlight scala %}
import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
// environment configuration
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 7a75c0a..e4b8a55 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -44,6 +44,9 @@ The following example shows the differences between the Scala and Java Table API
The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings.
{% highlight java %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.java._
+
// environment configuration
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
@@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is constructed. Table
{% highlight scala %}
import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
// environment configuration
diff --git a/flink-table/flink-table-api-scala-bridge/pom.xml b/flink-table/flink-table-api-scala-bridge/pom.xml
index 2a7f2ec..f49f71c 100644
--- a/flink-table/flink-table-api-scala-bridge/pom.xml
+++ b/flink-table/flink-table-api-scala-bridge/pom.xml
@@ -61,19 +61,18 @@ under the License.
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) compile phase -->
+ <!-- Run Scala compiler in the process-resources phase, so that dependencies on
+ Scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
- <goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
- <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) test-compile phase -->
+ <!-- Run Scala compiler in the process-test-resources phase, so that dependencies on
+ Scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
index 4b92bdb..4d80e75 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.api.scala
+import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Table
@@ -29,16 +30,17 @@ import org.apache.flink.table.expressions.Expression
* @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
* @tparam T The type of the [[DataSet]].
*/
+@PublicEvolving
class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) {
/**
* Converts the [[DataSet]] into a [[Table]].
*
- * The field name of the new [[Table]] can be specified like this:
+ * The field names of the new [[Table]] can be specified like this:
*
* {{{
* val env = ExecutionEnvironment.getExecutionEnvironment
- * val tEnv = TableEnvironment.getTableEnvironment(env)
+ * val tEnv = BatchTableEnvironment.create(env)
*
* val set: DataSet[(String, Int)] = ...
* val table = set.toTable(tEnv, 'name, 'amount)
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
index 3c31859..1360f5c 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.api.scala
+import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.Table
@@ -29,16 +30,17 @@ import org.apache.flink.table.expressions.Expression
* @param inputType The [[TypeInformation]] for the type of the [[DataStream]].
* @tparam T The type of the [[DataStream]].
*/
+@PublicEvolving
class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInformation[T]) {
/**
* Converts the [[DataStream]] into a [[Table]].
*
- * The field name of the new [[Table]] can be specified like this:
+ * The field names of the new [[Table]] can be specified like this:
*
* {{{
* val env = StreamExecutionEnvironment.getExecutionEnvironment
- * val tEnv = TableEnvironment.getTableEnvironment(env)
+ * val tEnv = StreamTableEnvironment.create(env)
*
* val stream: DataStream[(String, Int)] = ...
* val table = stream.toTable(tEnv, 'name, 'amount)
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 0f3c8eb..ff74c78 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -18,18 +18,19 @@
package org.apache.flink.table.api.scala
+import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.internal.TableImpl
-import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException}
+import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException, ValidationException}
/**
* Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]].
*
* @param table The table to convert.
*/
+@PublicEvolving
class TableConversions(table: Table) {
private val internalTable = table.asInstanceOf[TableImpl]
@@ -48,10 +49,10 @@ class TableConversions(table: Table) {
def toDataSet[T: TypeInformation]: DataSet[T] = {
internalTable.getTableEnvironment match {
- case tEnv: ScalaBatchTableEnv =>
+ case tEnv: BatchTableEnvironment =>
tEnv.toDataSet(table)
case _ =>
- throw new TableException(
+ throw new ValidationException(
"Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
}
}
@@ -71,10 +72,10 @@ class TableConversions(table: Table) {
def toDataSet[T: TypeInformation](queryConfig: BatchQueryConfig): DataSet[T] = {
internalTable.getTableEnvironment match {
- case tEnv: ScalaBatchTableEnv =>
+ case tEnv: BatchTableEnvironment =>
tEnv.toDataSet(table, queryConfig)
case _ =>
- throw new TableException(
+ throw new ValidationException(
"Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
}
}
@@ -96,10 +97,10 @@ class TableConversions(table: Table) {
def toAppendStream[T: TypeInformation]: DataStream[T] = {
internalTable.getTableEnvironment match {
- case tEnv: ScalaStreamTableEnv =>
+ case tEnv: StreamTableEnvironment =>
tEnv.toAppendStream(table)
case _ =>
- throw new TableException(
+ throw new ValidationException(
"Only tables that originate from Scala DataStreams " +
"can be converted to Scala DataStreams.")
}
@@ -122,10 +123,10 @@ class TableConversions(table: Table) {
*/
def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = {
internalTable.getTableEnvironment match {
- case tEnv: ScalaStreamTableEnv =>
+ case tEnv: StreamTableEnvironment =>
tEnv.toAppendStream(table, queryConfig)
case _ =>
- throw new TableException(
+ throw new ValidationException(
"Only tables that originate from Scala DataStreams " +
"can be converted to Scala DataStreams.")
}
@@ -141,7 +142,7 @@ class TableConversions(table: Table) {
def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = {
internalTable.getTableEnvironment match {
- case tEnv: ScalaStreamTableEnv =>
+ case tEnv: StreamTableEnvironment =>
tEnv.toRetractStream(table)
case _ =>
throw new TableException(
@@ -163,7 +164,7 @@ class TableConversions(table: Table) {
queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
internalTable.getTableEnvironment match {
- case tEnv: ScalaStreamTableEnv =>
+ case tEnv: StreamTableEnvironment =>
tEnv.toRetractStream(table, queryConfig)
case _ =>
throw new TableException(
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala
new file mode 100644
index 0000000..556c657
--- /dev/null
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.internal.TableImpl
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+import org.apache.flink.types.Row
+
+import _root_.scala.language.implicitConversions
+
+/**
+ * == Table & SQL API with Flink's DataStream API ==
+ *
+ * This package contains the API of the Table & SQL API that bridges to Flink's [[DataStream]] API
+ * for the Scala programming language. Users can create [[Table]]s from [[DataStream]]s on which
+ * relational operations can be performed. Tables can also be converted back to [[DataStream]]s for
+ * further processing.
+ *
+ * For accessing all API classes and implicit conversions, use the following imports:
+ *
+ * {{{
+ * import org.apache.flink.table.api._
+ * import org.apache.flink.table.api.scala._
+ * }}}
+ *
+ * More information about the entry points of the API can be found in [[StreamTableEnvironment]].
+ *
+ * Available implicit expressions are listed in [[ImplicitExpressionConversions]] and
+ * [[ImplicitExpressionOperations]].
+ *
+ * Available implicit table-to-stream conversions are listed in this package object.
+ *
+ * Please refer to the website documentation about how to construct and run table programs that are
+ * connected to the DataStream API.
+ */
+package object scala extends ImplicitExpressionConversions {
+
+ // This package object should not extend from ImplicitExpressionConversions but would clash with
+ // "org.apache.flink.table.api._" therefore we postpone splitting the package object into
+ // two and let users update there imports first. All users should import both `api._` and
+ // `api.scala._`.
+
+ implicit def tableConversions(table: Table): TableConversions = {
+ new TableConversions(table.asInstanceOf[TableImpl])
+ }
+
+ implicit def dataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = {
+ new DataSetConversions[T](set, set.getType())
+ }
+
+ implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
+ new DataStreamConversions[T](set, set.dataType)
+ }
+
+ implicit def table2RowDataSet(table: Table): DataSet[Row] = {
+ val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment
+ if (!tableEnv.isInstanceOf[BatchTableEnvironment]) {
+ throw new ValidationException("Table cannot be converted into a DataSet. " +
+ "It is not part of a batch table environment.")
+ }
+ tableEnv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
+ }
+
+ implicit def table2RowDataStream(table: Table): DataStream[Row] = {
+ val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment
+ if (!tableEnv.isInstanceOf[StreamTableEnvironment]) {
+ throw new ValidationException("Table cannot be converted into a DataStream. " +
+ "It is not part of a stream table environment.")
+ }
+ tableEnv.asInstanceOf[StreamTableEnvironment].toAppendStream[Row](table)
+ }
+}
diff --git a/flink-table/flink-table-api-scala/pom.xml b/flink-table/flink-table-api-scala/pom.xml
index 7fd6f0a..37a1bcf 100644
--- a/flink-table/flink-table-api-scala/pom.xml
+++ b/flink-table/flink-table-api-scala/pom.xml
@@ -37,6 +37,7 @@ under the License.
<packaging>jar</packaging>
<dependencies>
+ <!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
@@ -47,5 +48,51 @@ under the License.
<artifactId>flink-table-api-java</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- External dependencies -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <!-- Run Scala compiler in the process-resources phase, so that dependencies on
+ Scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run Scala compiler in the process-test-resources phase, so that dependencies on
+ Scala classes can be resolved later in the (Java) test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Scala Code Style -->
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <configuration>
+ <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
similarity index 87%
rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
rename to flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index c84d5de..2248f8e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -15,18 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.table.api.scala
+package org.apache.flink.table.api
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort}
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
+import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.api.{DataTypes, Over, Table, ValidationException}
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{RANGE_TO, WITH_COLUMNS, E => FDE, UUID => FDUUID, _}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserFunctionsTypeHelper, _}
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.utils.TypeConversions
@@ -41,6 +41,7 @@ import _root_.scala.language.implicitConversions
* These operations must be kept in sync with the parser in
* [[org.apache.flink.table.expressions.ExpressionParser]].
*/
+@PublicEvolving
trait ImplicitExpressionOperations {
private[flink] def expr: Expression
@@ -189,7 +190,9 @@ trait ImplicitExpressionOperations {
/**
* Indicates the range from left to right, i.e. [left, right], which can be used in columns
- * selection, e.g.: withColumns(1 to 3).
+ * selection.
+ *
+ * e.g. withColumns(1 to 3)
*/
def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, other)
@@ -885,7 +888,7 @@ trait ImplicitExpressionOperations {
* @param name name of the field (similar to Flink's field expressions)
* @return value of the field
*/
- def get(name: String): Expression = unresolvedCall(GET, expr, name)
+ def get(name: String): Expression = unresolvedCall(GET, expr, valueLiteral(name))
/**
* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
@@ -894,7 +897,7 @@ trait ImplicitExpressionOperations {
* @param index position of the field
* @return value of the field
*/
- def get(index: Int): Expression = unresolvedCall(GET, expr, index)
+ def get(index: Int): Expression = unresolvedCall(GET, expr, valueLiteral(index))
/**
* Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
@@ -997,8 +1000,13 @@ trait ImplicitExpressionOperations {
* Implicit conversions from Scala literals to [[Expression]] and from [[Expression]]
* to [[ImplicitExpressionOperations]].
*/
+@PublicEvolving
trait ImplicitExpressionConversions {
+ // ----------------------------------------------------------------------------------------------
+ // Implicit values
+ // ----------------------------------------------------------------------------------------------
+
/**
* Offset constant to be used in the `preceding` clause of unbounded [[Over]] windows. Use this
* constant for a time interval. Unbounded over windows start with the first row of a partition.
@@ -1026,6 +1034,10 @@ trait ImplicitExpressionConversions {
*/
implicit val CURRENT_RANGE: Expression = unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE)
+ // ----------------------------------------------------------------------------------------------
+ // Implicit conversions
+ // ----------------------------------------------------------------------------------------------
+
implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
def expr: Expression = e
}
@@ -1244,101 +1256,56 @@ trait ImplicitExpressionConversions {
convertArray(array)
}
-}
-
-// ------------------------------------------------------------------------------------------------
-// Expressions with no parameters
-// ------------------------------------------------------------------------------------------------
-
-// we disable the object checker here as it checks for capital letters of objects
-// but we want that objects look like functions in certain cases e.g. array(1, 2, 3)
-// scalastyle:off object.name
-/**
- * Returns the current SQL date in UTC time zone.
- */
-object currentDate {
+ // ----------------------------------------------------------------------------------------------
+ // Implicit expressions in prefix notation
+ // ----------------------------------------------------------------------------------------------
/**
* Returns the current SQL date in UTC time zone.
*/
- def apply(): Expression = {
+ def currentDate(): Expression = {
unresolvedCall(CURRENT_DATE)
}
-}
-
-/**
- * Returns the current SQL time in UTC time zone.
- */
-object currentTime {
/**
* Returns the current SQL time in UTC time zone.
*/
- def apply(): Expression = {
+ def currentTime(): Expression = {
unresolvedCall(CURRENT_TIME)
}
-}
-
-/**
- * Returns the current SQL timestamp in UTC time zone.
- */
-object currentTimestamp {
/**
* Returns the current SQL timestamp in UTC time zone.
*/
- def apply(): Expression = {
+ def currentTimestamp(): Expression = {
unresolvedCall(CURRENT_TIMESTAMP)
}
-}
-
-/**
- * Returns the current SQL time in local time zone.
- */
-object localTime {
/**
* Returns the current SQL time in local time zone.
*/
- def apply(): Expression = {
+ def localTime(): Expression = {
unresolvedCall(LOCAL_TIME)
}
-}
-
-/**
- * Returns the current SQL timestamp in local time zone.
- */
-object localTimestamp {
/**
* Returns the current SQL timestamp in local time zone.
*/
- def apply(): Expression = {
+ def localTimestamp(): Expression = {
unresolvedCall(LOCAL_TIMESTAMP)
}
-}
-
-/**
- * Determines whether two anchored time intervals overlap. Time point and temporal are
- * transformed into a range defined by two time points (start, end). The function
- * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
- *
- * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
- *
- * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
- */
-object temporalOverlaps {
/**
* Determines whether two anchored time intervals overlap. Time point and temporal are
- * transformed into a range defined by two time points (start, end).
+ * transformed into a range defined by two time points (start, end). The function
+ * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
*
* It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
*
* e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
*/
- def apply(
+ def temporalOverlaps(
leftTimePoint: Expression,
leftTemporal: Expression,
rightTimePoint: Expression,
@@ -1346,17 +1313,6 @@ object temporalOverlaps {
: Expression = {
unresolvedCall(TEMPORAL_OVERLAPS, leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
}
-}
-
-/**
- * Formats a timestamp as a string using a specified format.
- * The format must be compatible with MySQL's date formatting syntax as used by the
- * date_parse function.
- *
- * For example <code>dataFormat('time, "%Y, %d %M")</code> results in strings
- * formatted as "2017, 05 May".
- */
-object dateFormat {
/**
* Formats a timestamp as a string using a specified format.
@@ -1369,21 +1325,12 @@ object dateFormat {
* @param format The format of the string.
* @return The formatted timestamp as string.
*/
- def apply(
+ def dateFormat(
timestamp: Expression,
format: Expression)
: Expression = {
unresolvedCall(DATE_FORMAT, timestamp, format)
}
-}
-
-/**
- * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
- *
- * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads
- * to 3.
- */
-object timestampDiff {
/**
* Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
@@ -1396,89 +1343,53 @@ object timestampDiff {
* @param timePoint2 The second point in time.
* @return The number of intervals as integer value.
*/
- def apply(
+ def timestampDiff(
timePointUnit: TimePointUnit,
timePoint1: Expression,
timePoint2: Expression)
: Expression = {
unresolvedCall(TIMESTAMP_DIFF, timePointUnit, timePoint1, timePoint2)
}
-}
-
-/**
- * Creates an array of literals. The array will be an array of objects (not primitives).
- */
-object array {
/**
- * Creates an array of literals. The array will be an array of objects (not primitives).
+ * Creates an array of literals.
*/
- def apply(head: Expression, tail: Expression*): Expression = {
+ def array(head: Expression, tail: Expression*): Expression = {
unresolvedCall(ARRAY, head +: tail: _*)
}
-}
-
-/**
- * Creates a row of expressions.
- */
-object row {
/**
* Creates a row of expressions.
*/
- def apply(head: Expression, tail: Expression*): Expression = {
+ def row(head: Expression, tail: Expression*): Expression = {
unresolvedCall(ROW, head +: tail: _*)
}
-}
-
-/**
- * Creates a map of expressions. The map will be a map between two objects (not primitives).
- */
-object map {
/**
- * Creates a map of expressions. The map will be a map between two objects (not primitives).
+ * Creates a map of expressions.
*/
- def apply(key: Expression, value: Expression, tail: Expression*): Expression = {
+ def map(key: Expression, value: Expression, tail: Expression*): Expression = {
unresolvedCall(MAP, key +: value +: tail: _*)
}
-}
-
-/**
- * Returns a value that is closer than any other value to pi.
- */
-object pi {
/**
* Returns a value that is closer than any other value to pi.
*/
- def apply(): Expression = {
+ def pi(): Expression = {
unresolvedCall(PI)
}
-}
-
-/**
- * Returns a value that is closer than any other value to e.
- */
-object e {
/**
* Returns a value that is closer than any other value to e.
*/
- def apply(): Expression = {
- unresolvedCall(FDE)
+ def e(): Expression = {
+ unresolvedCall(E)
}
-}
-
-/**
- * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
- */
-object rand {
/**
* Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
*/
- def apply(): Expression = {
+ def rand(): Expression = {
unresolvedCall(RAND)
}
@@ -1487,22 +1398,15 @@ object rand {
* initial seed. Two rand() functions will return identical sequences of numbers if they
* have same initial seed.
*/
- def apply(seed: Expression): Expression = {
+ def rand(seed: Expression): Expression = {
unresolvedCall(RAND, seed)
}
-}
-
-/**
- * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
- * value (exclusive).
- */
-object randInteger {
/**
* Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
* value (exclusive).
*/
- def apply(bound: Expression): Expression = {
+ def randInteger(bound: Expression): Expression = {
unresolvedCall(RAND_INTEGER, bound)
}
@@ -1511,59 +1415,35 @@ object randInteger {
* (exclusive) with a initial seed. Two randInteger() functions will return identical sequences
* of numbers if they have same initial seed and same bound.
*/
- def apply(seed: Expression, bound: Expression): Expression = {
+ def randInteger(seed: Expression, bound: Expression): Expression = {
unresolvedCall(RAND_INTEGER, seed, bound)
}
-}
-
-/**
- * Returns the string that results from concatenating the arguments.
- * Returns NULL if any argument is NULL.
- */
-object concat {
/**
* Returns the string that results from concatenating the arguments.
* Returns NULL if any argument is NULL.
*/
- def apply(string: Expression, strings: Expression*): Expression = {
+ def concat(string: Expression, strings: Expression*): Expression = {
unresolvedCall(CONCAT, string +: strings: _*)
}
-}
-
-/**
- * Calculates the arc tangent of a given coordinate.
- */
-object atan2 {
/**
* Calculates the arc tangent of a given coordinate.
*/
- def apply(y: Expression, x: Expression): Expression = {
+ def atan2(y: Expression, x: Expression): Expression = {
unresolvedCall(ATAN2, y, x)
}
-}
-/**
- * Returns the string that results from concatenating the arguments and separator.
- * Returns NULL If the separator is NULL.
- *
- * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
- * values after the separator argument.
- **/
-object concat_ws {
- def apply(separator: Expression, string: Expression, strings: Expression*): Expression = {
+ /**
+ * Returns the string that results from concatenating the arguments and separator.
+ * Returns NULL If the separator is NULL.
+ *
+ * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
+ * values after the separator argument.
+ **/
+ def concat_ws(separator: Expression, string: Expression, strings: Expression*): Expression = {
unresolvedCall(CONCAT_WS, separator +: string +: strings: _*)
}
-}
-
-/**
- * Returns an UUID (Universally Unique Identifier) string (e.g.,
- * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly
- * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number
- * generator.
- */
-object uuid {
/**
* Returns an UUID (Universally Unique Identifier) string (e.g.,
@@ -1571,66 +1451,43 @@ object uuid {
* generated) UUID. The UUID is generated using a cryptographically strong pseudo random number
* generator.
*/
- def apply(): Expression = {
- unresolvedCall(FDUUID)
+ def uuid(): Expression = {
+ unresolvedCall(UUID)
}
-}
-
-/**
- * Returns a null literal value of a given data type.
- *
- * e.g. nullOf(DataTypes.INT())
- */
-object nullOf {
/**
* Returns a null literal value of a given data type.
*
* e.g. nullOf(DataTypes.INT())
*/
- def apply(dataType: DataType): Expression = {
+ def nullOf(dataType: DataType): Expression = {
valueLiteral(null, dataType)
}
/**
- * @deprecated This method will be removed in future versions as it uses the old type system. It
- * is recommended to use [[apply(DataType)]] instead which uses the new type system
- * based on [[DataTypes]]. Please make sure to use either the old or the new type
- * system consistently to avoid unintended behavior. See the website documentation
- * for more information.
+ * @deprecated This method will be removed in future versions as it uses the old type system.
+ * It is recommended to use [[nullOf(DataType)]] instead which uses the new type
+ * system based on [[DataTypes]]. Please make sure to use either the old or the new
+ * type system consistently to avoid unintended behavior. See the website
+ * documentation for more information.
*/
- def apply(typeInfo: TypeInformation[_]): Expression = {
- apply(TypeConversions.fromLegacyInfoToDataType(typeInfo))
+ def nullOf(typeInfo: TypeInformation[_]): Expression = {
+ nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo))
}
-}
-
-/**
- * Calculates the logarithm of the given value.
- */
-object log {
/**
- * Calculates the natural logarithm of the given value.
+ * Calculates the logarithm of the given value.
*/
- def apply(value: Expression): Expression = {
+ def log(value: Expression): Expression = {
unresolvedCall(LOG, value)
}
/**
* Calculates the logarithm of the given value to the given base.
*/
- def apply(base: Expression, value: Expression): Expression = {
+ def log(base: Expression, value: Expression): Expression = {
unresolvedCall(LOG, base, value)
}
-}
-
-/**
- * Ternary conditional operator that decides which of two other expressions should be evaluated
- * based on a evaluated boolean condition.
- *
- * e.g. ifThenElse(42 > 5, "A", "B") leads to "A"
- */
-object ifThenElse {
/**
* Ternary conditional operator that decides which of two other expressions should be evaluated
@@ -1642,30 +1499,34 @@ object ifThenElse {
* @param ifTrue expression to be evaluated if condition holds
* @param ifFalse expression to be evaluated if condition does not hold
*/
- def apply(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = {
+ def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = {
unresolvedCall(IF, condition, ifTrue, ifFalse)
}
-}
-/**
- * Creates a withColumns expression.
- */
-object withColumns {
-
- def apply(head: Expression, tail: Expression*): Expression = {
+ /**
+ * Creates an expression that selects a range of columns. It can be used wherever an array of
+ * expression is accepted such as function calls, projections, or groupings.
+ *
+ * A range can either be index-based or name-based. Indices start at 1 and boundaries are
+ * inclusive.
+ *
+ * e.g. withColumns('b to 'c) or withColumns('*)
+ */
+ def withColumns(head: Expression, tail: Expression*): Expression = {
unresolvedCall(WITH_COLUMNS, head +: tail: _*)
}
-}
-/**
- * Creates a withoutColumns expression.
- */
-object withoutColumns {
-
- def apply(head: Expression, tail: Expression*): Expression = {
+ /**
+ * Creates an expression that selects all columns except for the given range of columns. It can
+ * be used wherever an array of expression is accepted such as function calls, projections, or
+ * groupings.
+ *
+ * A range can either be index-based or name-based. Indices start at 1 and boundaries are
+ * inclusive.
+ *
+ * e.g. withoutColumns('b to 'c) or withoutColumns('c)
+ */
+ def withoutColumns(head: Expression, tail: Expression*): Expression = {
unresolvedCall(WITHOUT_COLUMNS, head +: tail: _*)
}
}
-
-
-// scalastyle:on object.name
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala
new file mode 100644
index 0000000..2fb4f32
--- /dev/null
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.api.{ImplicitExpressionConversions, ImplicitExpressionOperations, Table, TableEnvironment}
+
+/**
+ * == Table & SQL API ==
+ *
+ * This package contains the API of the Table & SQL API. Users create [[Table]]s on which
+ * relational operations can be performed.
+ *
+ * For accessing all API classes and implicit conversions, use the following imports:
+ *
+ * {{{
+ * import org.apache.flink.table.api._
+ * }}}
+ *
+ * More information about the entry points of the API can be found in [[TableEnvironment]].
+ *
+ * Available implicit expressions are listed in [[ImplicitExpressionConversions]] and
+ * [[ImplicitExpressionOperations]].
+ *
+ * Please refer to the website documentation about how to construct and run table programs.
+ */
+package object api /* extends ImplicitExpressionConversions */ {
+
+ // This package object should extend from ImplicitExpressionConversions but would clash with
+ // "org.apache.flink.table.api.scala._" therefore we postpone splitting the package object into
+ // two and let users update there imports first
+}
+
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala
deleted file mode 100644
index 64e7801..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table
-
-/**
- * == Table API ==
- *
- * This package contains the API of the Table API. It can be used with Flink Streaming
- * and Flink Batch. From Scala as well as from Java.
- *
- * When using the Table API, as user creates a [[org.apache.flink.table.api.Table]] from
- * a DataSet or DataStream. On this relational operations can be performed. A table can also
- * be converted back to a DataSet or DataStream.
- *
- * Packages [[org.apache.flink.table.api.scala]] and [[org.apache.flink.table.api.java]] contain
- * the language specific part of the API. Refer to these packages for documentation on how
- * the Table API can be used in Java and Scala.
- */
-package object api
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala
deleted file mode 100644
index 8e4855b..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api
-
-import org.apache.flink.types.Row
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.table.api.internal.TableImpl
-import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
-
-import _root_.scala.language.implicitConversions
-
-/**
- * == Table API (Scala) ==
- *
- * Importing this package with:
- *
- * {{{
- * import org.apache.flink.table.api.scala._
- * }}}
- *
- * imports implicit conversions for converting a [[DataSet]] and a [[DataStream]] to a
- * [[Table]]. This can be used to perform SQL-like queries on data. Please have
- * a look at [[Table]] to see which operations are supported and
- * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]] to see how an
- * expression can be specified.
- *
- * When writing a query you can use Scala Symbols to refer to field names. One would
- * refer to field `a` by writing `'a`. Sometimes it is necessary to manually convert a
- * Scala literal to an expression literal, in those cases use `.toExpr`, as in `3.toExpr`.
- *
- * Example:
- *
- * {{{
- * import org.apache.flink.api.scala._
- * import org.apache.flink.table.api.scala._
- *
- * val env = ExecutionEnvironment.getExecutionEnvironment
- * val tEnv = TableEnvironment.getTableEnvironment(env)
- *
- * val input: DataSet[(String, Int)] = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
- * val result = input
- * .toTable(tEnv, 'word, 'count)
- * .groupBy('word)
- * .select('word, 'count.avg)
- *
- * result.print()
- * }}}
- *
- */
-package object scala extends ImplicitExpressionConversions {
-
- implicit def table2TableConversions(table: Table): TableConversions = {
- new TableConversions(table.asInstanceOf[TableImpl])
- }
-
- implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = {
- new DataSetConversions[T](set, set.getType())
- }
-
- implicit def table2RowDataSet(table: Table): DataSet[Row] = {
- val tableEnv =
- table.asInstanceOf[TableImpl].getTableEnvironment.asInstanceOf[ScalaBatchTableEnv]
- tableEnv.toDataSet[Row](table)
- }
-
- implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
- new DataStreamConversions[T](set, set.dataType)
- }
-
- implicit def table2RowDataStream(table: Table): DataStream[Row] = {
- val tableEnv =
- table.asInstanceOf[TableImpl].getTableEnvironment.asInstanceOf[ScalaStreamTableEnv]
- tableEnv.toAppendStream[Row](table)
- }
-}