You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 15:23:51 UTC

[flink] branch master updated: [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d903480  [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala
d903480 is described below

commit d903480ef7abc95cd419e6a194db2276dd7eb4cb
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Jul 2 16:55:08 2019 +0200

    [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala
    
    This move the Scala expression DSL to flink-table-api-scala.
    
    Users of pure table programs should define there imports like:
    
    import org.apache.flink.table.api._
    TableEnvironment.create(...)
    
    Users of the DataStream API should define their imports like:
    
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.scala._
    StreamTableEnvironment.create(...)
    
    This commit did not split the package object org.apache.flink.table.api.scala._ into
    two parts yet because we want to give users the chance to update their imports.
    
    This closes #8945.
---
 docs/dev/table/tableApi.md                         |   4 +
 docs/dev/table/tableApi.zh.md                      |   4 +
 flink-table/flink-table-api-scala-bridge/pom.xml   |   9 +-
 .../flink/table/api/scala/DataSetConversions.scala |   6 +-
 .../table/api/scala/DataStreamConversions.scala    |   6 +-
 .../flink/table/api/scala/TableConversions.scala   |  25 +-
 .../org/apache/flink/table/api/scala/package.scala |  89 ++++++
 flink-table/flink-table-api-scala/pom.xml          |  47 +++
 .../apache/flink/table/api}/expressionDsl.scala    | 319 ++++++---------------
 .../scala/org/apache/flink/table/api/package.scala |  48 ++++
 .../scala/org/apache/flink/table/api/package.scala |  34 ---
 .../org/apache/flink/table/api/scala/package.scala |  93 ------
 12 files changed, 307 insertions(+), 377 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a0bd51a..bd6bd38 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -44,6 +44,9 @@ The following example shows the differences between the Scala and Java Table API
 The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings.
 
 {% highlight java %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.java._
+
 // environment configuration
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
@@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is constructed. Table
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
 import org.apache.flink.table.api.scala._
 
 // environment configuration
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 7a75c0a..e4b8a55 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -44,6 +44,9 @@ The following example shows the differences between the Scala and Java Table API
 The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings.
 
 {% highlight java %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.java._
+
 // environment configuration
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
@@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is constructed. Table
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
 import org.apache.flink.table.api.scala._
 
 // environment configuration
diff --git a/flink-table/flink-table-api-scala-bridge/pom.xml b/flink-table/flink-table-api-scala-bridge/pom.xml
index 2a7f2ec..f49f71c 100644
--- a/flink-table/flink-table-api-scala-bridge/pom.xml
+++ b/flink-table/flink-table-api-scala-bridge/pom.xml
@@ -61,19 +61,18 @@ under the License.
 				<groupId>net.alchim31.maven</groupId>
 				<artifactId>scala-maven-plugin</artifactId>
 				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
+					<!-- Run Scala compiler in the process-resources phase, so that dependencies on
+						Scala classes can be resolved later in the (Java) compile phase -->
 					<execution>
 						<id>scala-compile-first</id>
 						<phase>process-resources</phase>
 						<goals>
-							<goal>add-source</goal>
 							<goal>compile</goal>
 						</goals>
 					</execution>
 
-					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<!-- Run Scala compiler in the process-test-resources phase, so that dependencies on
+						 Scala classes can be resolved later in the (Java) test-compile phase -->
 					<execution>
 						<id>scala-test-compile</id>
 						<phase>process-test-resources</phase>
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
index 4b92bdb..4d80e75 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Table
@@ -29,16 +30,17 @@ import org.apache.flink.table.expressions.Expression
   * @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
   * @tparam T The type of the [[DataSet]].
   */
+@PublicEvolving
 class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) {
 
   /**
     * Converts the [[DataSet]] into a [[Table]].
     *
-    * The field name of the new [[Table]] can be specified like this:
+    * The field names of the new [[Table]] can be specified like this:
     *
     * {{{
     *   val env = ExecutionEnvironment.getExecutionEnvironment
-    *   val tEnv = TableEnvironment.getTableEnvironment(env)
+    *   val tEnv = BatchTableEnvironment.create(env)
     *
     *   val set: DataSet[(String, Int)] = ...
     *   val table = set.toTable(tEnv, 'name, 'amount)
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
index 3c31859..1360f5c 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.Table
@@ -29,16 +30,17 @@ import org.apache.flink.table.expressions.Expression
   * @param inputType The [[TypeInformation]] for the type of the [[DataStream]].
   * @tparam T The type of the [[DataStream]].
   */
+@PublicEvolving
 class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInformation[T]) {
 
   /**
     * Converts the [[DataStream]] into a [[Table]].
     *
-    * The field name of the new [[Table]] can be specified like this:
+    * The field names of the new [[Table]] can be specified like this:
     *
     * {{{
     *   val env = StreamExecutionEnvironment.getExecutionEnvironment
-    *   val tEnv = TableEnvironment.getTableEnvironment(env)
+    *   val tEnv = StreamTableEnvironment.create(env)
     *
     *   val stream: DataStream[(String, Int)] = ...
     *   val table = stream.toTable(tEnv, 'name, 'amount)
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 0f3c8eb..ff74c78 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -18,18 +18,19 @@
 
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.internal.TableImpl
-import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException}
+import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException, ValidationException}
 
 /**
   * Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]].
   *
   * @param table The table to convert.
   */
+@PublicEvolving
 class TableConversions(table: Table) {
 
   private val internalTable = table.asInstanceOf[TableImpl]
@@ -48,10 +49,10 @@ class TableConversions(table: Table) {
   def toDataSet[T: TypeInformation]: DataSet[T] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaBatchTableEnv =>
+      case tEnv: BatchTableEnvironment =>
         tEnv.toDataSet(table)
       case _ =>
-        throw new TableException(
+        throw new ValidationException(
           "Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
     }
   }
@@ -71,10 +72,10 @@ class TableConversions(table: Table) {
   def toDataSet[T: TypeInformation](queryConfig: BatchQueryConfig): DataSet[T] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaBatchTableEnv =>
+      case tEnv: BatchTableEnvironment =>
         tEnv.toDataSet(table, queryConfig)
       case _ =>
-        throw new TableException(
+        throw new ValidationException(
           "Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
     }
   }
@@ -96,10 +97,10 @@ class TableConversions(table: Table) {
   def toAppendStream[T: TypeInformation]: DataStream[T] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaStreamTableEnv =>
+      case tEnv: StreamTableEnvironment =>
         tEnv.toAppendStream(table)
       case _ =>
-        throw new TableException(
+        throw new ValidationException(
           "Only tables that originate from Scala DataStreams " +
             "can be converted to Scala DataStreams.")
     }
@@ -122,10 +123,10 @@ class TableConversions(table: Table) {
     */
   def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = {
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaStreamTableEnv =>
+      case tEnv: StreamTableEnvironment =>
         tEnv.toAppendStream(table, queryConfig)
       case _ =>
-        throw new TableException(
+        throw new ValidationException(
           "Only tables that originate from Scala DataStreams " +
             "can be converted to Scala DataStreams.")
     }
@@ -141,7 +142,7 @@ class TableConversions(table: Table) {
   def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaStreamTableEnv =>
+      case tEnv: StreamTableEnvironment =>
         tEnv.toRetractStream(table)
       case _ =>
         throw new TableException(
@@ -163,7 +164,7 @@ class TableConversions(table: Table) {
       queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaStreamTableEnv =>
+      case tEnv: StreamTableEnvironment =>
         tEnv.toRetractStream(table, queryConfig)
       case _ =>
         throw new TableException(
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala
new file mode 100644
index 0000000..556c657
--- /dev/null
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.internal.TableImpl
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+import org.apache.flink.types.Row
+
+import _root_.scala.language.implicitConversions
+
+/**
+  * == Table & SQL API with Flink's DataStream API ==
+  *
+  * This package contains the API of the Table & SQL API that bridges to Flink's [[DataStream]] API
+  * for the Scala programming language. Users can create [[Table]]s from [[DataStream]]s on which
+  * relational operations can be performed. Tables can also be converted back to [[DataStream]]s for
+  * further processing.
+  *
+  * For accessing all API classes and implicit conversions, use the following imports:
+  *
+  * {{{
+  *   import org.apache.flink.table.api._
+  *   import org.apache.flink.table.api.scala._
+  * }}}
+  *
+  * More information about the entry points of the API can be found in [[StreamTableEnvironment]].
+  *
+  * Available implicit expressions are listed in [[ImplicitExpressionConversions]] and
+  * [[ImplicitExpressionOperations]].
+  *
+  * Available implicit table-to-stream conversions are listed in this package object.
+  *
+  * Please refer to the website documentation about how to construct and run table programs that are
+  * connected to the DataStream API.
+  */
+package object scala extends ImplicitExpressionConversions {
+
+  // This package object should not extend from ImplicitExpressionConversions but would clash with
+  // "org.apache.flink.table.api._" therefore we postpone splitting the package object into
+  // two and let users update there imports first. All users should import both `api._` and
+  // `api.scala._`.
+
+  implicit def tableConversions(table: Table): TableConversions = {
+    new TableConversions(table.asInstanceOf[TableImpl])
+  }
+
+  implicit def dataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = {
+    new DataSetConversions[T](set, set.getType())
+  }
+
+  implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
+    new DataStreamConversions[T](set, set.dataType)
+  }
+
+  implicit def table2RowDataSet(table: Table): DataSet[Row] = {
+    val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment
+    if (!tableEnv.isInstanceOf[BatchTableEnvironment]) {
+      throw new ValidationException("Table cannot be converted into a DataSet. " +
+        "It is not part of a batch table environment.")
+    }
+    tableEnv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
+  }
+
+  implicit def table2RowDataStream(table: Table): DataStream[Row] = {
+    val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment
+    if (!tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new ValidationException("Table cannot be converted into a DataStream. " +
+        "It is not part of a stream table environment.")
+    }
+    tableEnv.asInstanceOf[StreamTableEnvironment].toAppendStream[Row](table)
+  }
+}
diff --git a/flink-table/flink-table-api-scala/pom.xml b/flink-table/flink-table-api-scala/pom.xml
index 7fd6f0a..37a1bcf 100644
--- a/flink-table/flink-table-api-scala/pom.xml
+++ b/flink-table/flink-table-api-scala/pom.xml
@@ -37,6 +37,7 @@ under the License.
 	<packaging>jar</packaging>
 
 	<dependencies>
+		<!-- Flink dependencies -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table-common</artifactId>
@@ -47,5 +48,51 @@ under the License.
 			<artifactId>flink-table-api-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
+		<!-- External dependencies -->
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+		</dependency>
 	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<executions>
+					<!-- Run Scala compiler in the process-resources phase, so that dependencies on
+						Scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run Scala compiler in the process-test-resources phase, so that dependencies on
+						 Scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Code Style -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<configuration>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
 </project>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
similarity index 87%
rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
rename to flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index c84d5de..2248f8e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -15,18 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.table.api.scala
+package org.apache.flink.table.api
 
 import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort}
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.api.{DataTypes, Over, Table, ValidationException}
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{RANGE_TO, WITH_COLUMNS, E => FDE, UUID => FDUUID, _}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserFunctionsTypeHelper, _}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.utils.TypeConversions
@@ -41,6 +41,7 @@ import _root_.scala.language.implicitConversions
   * These operations must be kept in sync with the parser in
   * [[org.apache.flink.table.expressions.ExpressionParser]].
   */
+@PublicEvolving
 trait ImplicitExpressionOperations {
   private[flink] def expr: Expression
 
@@ -189,7 +190,9 @@ trait ImplicitExpressionOperations {
 
   /**
     * Indicates the range from left to right, i.e. [left, right], which can be used in columns
-    * selection, e.g.: withColumns(1 to 3).
+    * selection.
+    *
+    * e.g. withColumns(1 to 3)
     */
   def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, other)
 
@@ -885,7 +888,7 @@ trait ImplicitExpressionOperations {
     * @param name name of the field (similar to Flink's field expressions)
     * @return value of the field
     */
-  def get(name: String): Expression = unresolvedCall(GET, expr, name)
+  def get(name: String): Expression = unresolvedCall(GET, expr, valueLiteral(name))
 
   /**
     * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
@@ -894,7 +897,7 @@ trait ImplicitExpressionOperations {
     * @param index position of the field
     * @return value of the field
     */
-  def get(index: Int): Expression = unresolvedCall(GET, expr, index)
+  def get(index: Int): Expression = unresolvedCall(GET, expr, valueLiteral(index))
 
   /**
     * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
@@ -997,8 +1000,13 @@ trait ImplicitExpressionOperations {
   * Implicit conversions from Scala literals to [[Expression]] and from [[Expression]]
   * to [[ImplicitExpressionOperations]].
   */
+@PublicEvolving
 trait ImplicitExpressionConversions {
 
+  // ----------------------------------------------------------------------------------------------
+  // Implicit values
+  // ----------------------------------------------------------------------------------------------
+
   /**
     * Offset constant to be used in the `preceding` clause of unbounded [[Over]] windows. Use this
     * constant for a time interval. Unbounded over windows start with the first row of a partition.
@@ -1026,6 +1034,10 @@ trait ImplicitExpressionConversions {
     */
   implicit val CURRENT_RANGE: Expression = unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE)
 
+  // ----------------------------------------------------------------------------------------------
+  // Implicit conversions
+  // ----------------------------------------------------------------------------------------------
+
   implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
     def expr: Expression = e
   }
@@ -1244,101 +1256,56 @@ trait ImplicitExpressionConversions {
 
     convertArray(array)
   }
-}
-
-// ------------------------------------------------------------------------------------------------
-// Expressions with no parameters
-// ------------------------------------------------------------------------------------------------
-
-// we disable the object checker here as it checks for capital letters of objects
-// but we want that objects look like functions in certain cases e.g. array(1, 2, 3)
-// scalastyle:off object.name
 
-/**
-  * Returns the current SQL date in UTC time zone.
-  */
-object currentDate {
+  // ----------------------------------------------------------------------------------------------
+  // Implicit expressions in prefix notation
+  // ----------------------------------------------------------------------------------------------
 
   /**
     * Returns the current SQL date in UTC time zone.
     */
-  def apply(): Expression = {
+  def currentDate(): Expression = {
     unresolvedCall(CURRENT_DATE)
   }
-}
-
-/**
-  * Returns the current SQL time in UTC time zone.
-  */
-object currentTime {
 
   /**
     * Returns the current SQL time in UTC time zone.
     */
-  def apply(): Expression = {
+  def currentTime(): Expression = {
     unresolvedCall(CURRENT_TIME)
   }
-}
-
-/**
-  * Returns the current SQL timestamp in UTC time zone.
-  */
-object currentTimestamp {
 
   /**
     * Returns the current SQL timestamp in UTC time zone.
     */
-  def apply(): Expression = {
+  def currentTimestamp(): Expression = {
     unresolvedCall(CURRENT_TIMESTAMP)
   }
-}
-
-/**
-  * Returns the current SQL time in local time zone.
-  */
-object localTime {
 
   /**
     * Returns the current SQL time in local time zone.
     */
-  def apply(): Expression = {
+  def localTime(): Expression = {
     unresolvedCall(LOCAL_TIME)
   }
-}
-
-/**
-  * Returns the current SQL timestamp in local time zone.
-  */
-object localTimestamp {
 
   /**
     * Returns the current SQL timestamp in local time zone.
     */
-  def apply(): Expression = {
+  def localTimestamp(): Expression = {
     unresolvedCall(LOCAL_TIMESTAMP)
   }
-}
-
-/**
-  * Determines whether two anchored time intervals overlap. Time point and temporal are
-  * transformed into a range defined by two time points (start, end). The function
-  * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
-  *
-  * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
-  *
-  * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
-  */
-object temporalOverlaps {
 
   /**
     * Determines whether two anchored time intervals overlap. Time point and temporal are
-    * transformed into a range defined by two time points (start, end).
+    * transformed into a range defined by two time points (start, end). The function
+    * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
     *
     * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
     *
     * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
     */
-  def apply(
+  def temporalOverlaps(
       leftTimePoint: Expression,
       leftTemporal: Expression,
       rightTimePoint: Expression,
@@ -1346,17 +1313,6 @@ object temporalOverlaps {
     : Expression = {
     unresolvedCall(TEMPORAL_OVERLAPS, leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
   }
-}
-
-/**
-  * Formats a timestamp as a string using a specified format.
-  * The format must be compatible with MySQL's date formatting syntax as used by the
-  * date_parse function.
-  *
-  * For example <code>dataFormat('time, "%Y, %d %M")</code> results in strings
-  * formatted as "2017, 05 May".
-  */
-object dateFormat {
 
   /**
     * Formats a timestamp as a string using a specified format.
@@ -1369,21 +1325,12 @@ object dateFormat {
     * @param format The format of the string.
     * @return The formatted timestamp as string.
     */
-  def apply(
+  def dateFormat(
       timestamp: Expression,
       format: Expression)
     : Expression = {
     unresolvedCall(DATE_FORMAT, timestamp, format)
   }
-}
-
-/**
-  * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
-  *
-  * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads
-  * to 3.
-  */
-object timestampDiff {
 
   /**
     * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
@@ -1396,89 +1343,53 @@ object timestampDiff {
     * @param timePoint2 The second point in time.
     * @return The number of intervals as integer value.
     */
-  def apply(
+  def timestampDiff(
       timePointUnit: TimePointUnit,
       timePoint1: Expression,
       timePoint2: Expression)
     : Expression = {
     unresolvedCall(TIMESTAMP_DIFF, timePointUnit, timePoint1, timePoint2)
   }
-}
-
-/**
-  * Creates an array of literals. The array will be an array of objects (not primitives).
-  */
-object array {
 
   /**
-    * Creates an array of literals. The array will be an array of objects (not primitives).
+    * Creates an array of literals.
     */
-  def apply(head: Expression, tail: Expression*): Expression = {
+  def array(head: Expression, tail: Expression*): Expression = {
     unresolvedCall(ARRAY, head +: tail: _*)
   }
-}
-
-/**
-  * Creates a row of expressions.
-  */
-object row {
 
   /**
     * Creates a row of expressions.
     */
-  def apply(head: Expression, tail: Expression*): Expression = {
+  def row(head: Expression, tail: Expression*): Expression = {
     unresolvedCall(ROW, head +: tail: _*)
   }
-}
-
-/**
-  * Creates a map of expressions. The map will be a map between two objects (not primitives).
-  */
-object map {
 
   /**
-    * Creates a map of expressions. The map will be a map between two objects (not primitives).
+    * Creates a map of expressions.
     */
-  def apply(key: Expression, value: Expression, tail: Expression*): Expression = {
+  def map(key: Expression, value: Expression, tail: Expression*): Expression = {
     unresolvedCall(MAP, key +: value +: tail: _*)
   }
-}
-
-/**
-  * Returns a value that is closer than any other value to pi.
-  */
-object pi {
 
   /**
     * Returns a value that is closer than any other value to pi.
     */
-  def apply(): Expression = {
+  def pi(): Expression = {
     unresolvedCall(PI)
   }
-}
-
-/**
-  * Returns a value that is closer than any other value to e.
-  */
-object e {
 
   /**
     * Returns a value that is closer than any other value to e.
     */
-  def apply(): Expression = {
-    unresolvedCall(FDE)
+  def e(): Expression = {
+    unresolvedCall(E)
   }
-}
-
-/**
-  * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
-  */
-object rand {
 
   /**
     * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
     */
-  def apply(): Expression = {
+  def rand(): Expression = {
     unresolvedCall(RAND)
   }
 
@@ -1487,22 +1398,15 @@ object rand {
     * initial seed. Two rand() functions will return identical sequences of numbers if they
     * have same initial seed.
     */
-  def apply(seed: Expression): Expression = {
+  def rand(seed: Expression): Expression = {
     unresolvedCall(RAND, seed)
   }
-}
-
-/**
-  * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
-  * value (exclusive).
-  */
-object randInteger {
 
   /**
     * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
     * value (exclusive).
     */
-  def apply(bound: Expression): Expression = {
+  def randInteger(bound: Expression): Expression = {
     unresolvedCall(RAND_INTEGER, bound)
   }
 
@@ -1511,59 +1415,35 @@ object randInteger {
     * (exclusive) with a initial seed. Two randInteger() functions will return identical sequences
     * of numbers if they have same initial seed and same bound.
     */
-  def apply(seed: Expression, bound: Expression): Expression = {
+  def randInteger(seed: Expression, bound: Expression): Expression = {
     unresolvedCall(RAND_INTEGER, seed, bound)
   }
-}
-
-/**
-  * Returns the string that results from concatenating the arguments.
-  * Returns NULL if any argument is NULL.
-  */
-object concat {
 
   /**
     * Returns the string that results from concatenating the arguments.
     * Returns NULL if any argument is NULL.
     */
-  def apply(string: Expression, strings: Expression*): Expression = {
+  def concat(string: Expression, strings: Expression*): Expression = {
     unresolvedCall(CONCAT, string +: strings: _*)
   }
-}
-
-/**
-  * Calculates the arc tangent of a given coordinate.
-  */
-object atan2 {
 
   /**
     * Calculates the arc tangent of a given coordinate.
     */
-  def apply(y: Expression, x: Expression): Expression = {
+  def atan2(y: Expression, x: Expression): Expression = {
     unresolvedCall(ATAN2, y, x)
   }
-}
 
-/**
-  * Returns the string that results from concatenating the arguments and separator.
-  * Returns NULL If the separator is NULL.
-  *
-  * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
-  * values after the separator argument.
-  **/
-object concat_ws {
-  def apply(separator: Expression, string: Expression, strings: Expression*): Expression = {
+  /**
+    * Returns the string that results from concatenating the arguments and separator.
+    * Returns NULL If the separator is NULL.
+    *
+    * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
+    * values after the separator argument.
+    **/
+  def concat_ws(separator: Expression, string: Expression, strings: Expression*): Expression = {
     unresolvedCall(CONCAT_WS, separator +: string +: strings: _*)
   }
-}
-
-/**
-  * Returns an UUID (Universally Unique Identifier) string (e.g.,
-  * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly
-  * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number
-  * generator.
-  */
-object uuid {
 
   /**
     * Returns an UUID (Universally Unique Identifier) string (e.g.,
@@ -1571,66 +1451,43 @@ object uuid {
     * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number
     * generator.
     */
-  def apply(): Expression = {
-    unresolvedCall(FDUUID)
+  def uuid(): Expression = {
+    unresolvedCall(UUID)
   }
-}
-
-/**
-  * Returns a null literal value of a given data type.
-  *
-  * e.g. nullOf(DataTypes.INT())
-  */
-object nullOf {
 
   /**
     * Returns a null literal value of a given data type.
     *
     * e.g. nullOf(DataTypes.INT())
     */
-  def apply(dataType: DataType): Expression = {
+  def nullOf(dataType: DataType): Expression = {
     valueLiteral(null, dataType)
   }
 
   /**
-    * @deprecated This method will be removed in future versions as it uses the old type system. It
-    *             is recommended to use [[apply(DataType)]] instead which uses the new type system
-    *             based on [[DataTypes]]. Please make sure to use either the old or the new type
-    *             system consistently to avoid unintended behavior. See the website documentation
-    *             for more information.
+    * @deprecated This method will be removed in future versions as it uses the old type system.
+    *             It is recommended to use [[nullOf(DataType)]] instead which uses the new type
+    *             system based on [[DataTypes]]. Please make sure to use either the old or the new
+    *             type system consistently to avoid unintended behavior. See the website
+    *             documentation for more information.
     */
-  def apply(typeInfo: TypeInformation[_]): Expression = {
-    apply(TypeConversions.fromLegacyInfoToDataType(typeInfo))
+  def nullOf(typeInfo: TypeInformation[_]): Expression = {
+    nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo))
   }
-}
-
-/**
-  * Calculates the logarithm of the given value.
-  */
-object log {
 
   /**
-    * Calculates the natural logarithm of the given value.
+    * Calculates the logarithm of the given value.
     */
-  def apply(value: Expression): Expression = {
+  def log(value: Expression): Expression = {
     unresolvedCall(LOG, value)
   }
 
   /**
     * Calculates the logarithm of the given value to the given base.
     */
-  def apply(base: Expression, value: Expression): Expression = {
+  def log(base: Expression, value: Expression): Expression = {
     unresolvedCall(LOG, base, value)
   }
-}
-
-/**
-  * Ternary conditional operator that decides which of two other expressions should be evaluated
-  * based on a evaluated boolean condition.
-  *
-  * e.g. ifThenElse(42 > 5, "A", "B") leads to "A"
-  */
-object ifThenElse {
 
   /**
     * Ternary conditional operator that decides which of two other expressions should be evaluated
@@ -1642,30 +1499,34 @@ object ifThenElse {
     * @param ifTrue expression to be evaluated if condition holds
     * @param ifFalse expression to be evaluated if condition does not hold
     */
-  def apply(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = {
+  def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = {
     unresolvedCall(IF, condition, ifTrue, ifFalse)
   }
-}
 
-/**
-  * Creates a withColumns expression.
-  */
-object withColumns {
-
-  def apply(head: Expression, tail: Expression*): Expression = {
+  /**
+    * Creates an expression that selects a range of columns. It can be used wherever an array of
+    * expression is accepted such as function calls, projections, or groupings.
+    *
+    * A range can either be index-based or name-based. Indices start at 1 and boundaries are
+    * inclusive.
+    *
+    * e.g. withColumns('b to 'c) or withColumns('*)
+    */
+  def withColumns(head: Expression, tail: Expression*): Expression = {
     unresolvedCall(WITH_COLUMNS, head +: tail: _*)
   }
-}
 
-/**
-  * Creates a withoutColumns expression.
-  */
-object withoutColumns {
-
-  def apply(head: Expression, tail: Expression*): Expression = {
+  /**
+    * Creates an expression that selects all columns except for the given range of columns. It can
+    * be used wherever an array of expression is accepted such as function calls, projections, or
+    * groupings.
+    *
+    * A range can either be index-based or name-based. Indices start at 1 and boundaries are
+    * inclusive.
+    *
+    * e.g. withoutColumns('b to 'c) or withoutColumns('c)
+    */
+  def withoutColumns(head: Expression, tail: Expression*): Expression = {
     unresolvedCall(WITHOUT_COLUMNS, head +: tail: _*)
   }
 }
-
-
-// scalastyle:on object.name
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala
new file mode 100644
index 0000000..2fb4f32
--- /dev/null
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.api.{ImplicitExpressionConversions, ImplicitExpressionOperations, Table, TableEnvironment}
+
+/**
+  * == Table & SQL API ==
+  *
+  * This package contains the API of the Table & SQL API. Users create [[Table]]s on which
+  * relational operations can be performed.
+  *
+  * For accessing all API classes and implicit conversions, use the following imports:
+  *
+  * {{{
+  *   import org.apache.flink.table.api._
+  * }}}
+  *
+  * More information about the entry points of the API can be found in [[TableEnvironment]].
+  *
+  * Available implicit expressions are listed in [[ImplicitExpressionConversions]] and
+  * [[ImplicitExpressionOperations]].
+  *
+  * Please refer to the website documentation about how to construct and run table programs.
+  */
+package object api /* extends ImplicitExpressionConversions */ {
+
+  // This package object should extend from ImplicitExpressionConversions but would clash with
+  // "org.apache.flink.table.api.scala._" therefore we postpone splitting the package object into
+  // two and let users update there imports first
+}
+
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala
deleted file mode 100644
index 64e7801..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table
-
-/**
- * == Table API ==
- *
- * This package contains the API of the Table API. It can be used with Flink Streaming
- * and Flink Batch. From Scala as well as from Java.
- *
- * When using the Table API, as user creates a [[org.apache.flink.table.api.Table]] from
- * a DataSet or DataStream. On this relational operations can be performed. A table can also
- * be converted back to a DataSet or DataStream.
- *
- * Packages [[org.apache.flink.table.api.scala]] and [[org.apache.flink.table.api.java]] contain
- * the language specific part of the API. Refer to these packages for documentation on how
- * the Table API can be used in Java and Scala.
- */
-package object api
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala
deleted file mode 100644
index 8e4855b..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api
-
-import org.apache.flink.types.Row
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.table.api.internal.TableImpl
-import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
-
-import _root_.scala.language.implicitConversions
-
-/**
-  * == Table API (Scala) ==
-  *
-  * Importing this package with:
-  *
-  * {{{
-  *   import org.apache.flink.table.api.scala._
-  * }}}
-  *
-  * imports implicit conversions for converting a [[DataSet]] and a [[DataStream]] to a
-  * [[Table]]. This can be used to perform SQL-like queries on data. Please have
-  * a look at [[Table]] to see which operations are supported and
-  * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]] to see how an
-  * expression can be specified.
-  *
-  * When writing a query you can use Scala Symbols to refer to field names. One would
-  * refer to field `a` by writing `'a`. Sometimes it is necessary to manually convert a
-  * Scala literal to an expression literal, in those cases use `.toExpr`, as in `3.toExpr`.
-  *
-  * Example:
-  *
-  * {{{
-  *   import org.apache.flink.api.scala._
-  *   import org.apache.flink.table.api.scala._
-  *
-  *   val env = ExecutionEnvironment.getExecutionEnvironment
-  *   val tEnv = TableEnvironment.getTableEnvironment(env)
-  *
-  *   val input: DataSet[(String, Int)] = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
-  *   val result = input
-  *         .toTable(tEnv, 'word, 'count)
-  *         .groupBy('word)
-  *         .select('word, 'count.avg)
-  *
-  *   result.print()
-  * }}}
-  *
-  */
-package object scala extends ImplicitExpressionConversions {
-
-  implicit def table2TableConversions(table: Table): TableConversions = {
-    new TableConversions(table.asInstanceOf[TableImpl])
-  }
-
-  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = {
-    new DataSetConversions[T](set, set.getType())
-  }
-
-  implicit def table2RowDataSet(table: Table): DataSet[Row] = {
-    val tableEnv =
-      table.asInstanceOf[TableImpl].getTableEnvironment.asInstanceOf[ScalaBatchTableEnv]
-    tableEnv.toDataSet[Row](table)
-  }
-
-  implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
-    new DataStreamConversions[T](set, set.dataType)
-  }
-
-  implicit def table2RowDataStream(table: Table): DataStream[Row] = {
-    val tableEnv =
-      table.asInstanceOf[TableImpl].getTableEnvironment.asInstanceOf[ScalaStreamTableEnv]
-    tableEnv.toAppendStream[Row](table)
-  }
-}