You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:10 UTC

[41/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
new file mode 100644
index 0000000..f13c042
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.explain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class PlanJsonParser {
+
+	public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception {
+		ObjectMapper objectMapper = new ObjectMapper();
+
+		//not every node is same, ignore the unknown field
+		objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+		PlanTree tree = objectMapper.readValue(t, PlanTree.class);
+		LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
+		StringWriter sw = new StringWriter();
+		PrintWriter pw = new PrintWriter(sw);
+		int tabCount = 0;
+
+		for (int index = 0; index < tree.getNodes().size(); index++) {
+			Node tempNode = tree.getNodes().get(index);
+
+			//input with operation such as join or union is coordinate, keep the same indent 
+			if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact()))) {
+				tabCount = map.get(tempNode.getPact());
+			}
+			else {
+				map.put(tempNode.getPact(), tabCount);
+			}
+
+			printTab(tabCount, pw);
+			pw.print("Stage " + tempNode.getId() + " : " + tempNode.getPact() + "\n");
+
+			printTab(tabCount + 1, pw);
+			String content = tempNode.getContents();
+
+			//drop the hashcode of object instance
+			int dele = tempNode.getContents().indexOf("@");
+			if (dele > -1) {
+				content = tempNode.getContents().substring(0, dele);
+			}
+
+			//replace with certain content if node is dataSource to pass
+			//unit tests, because java and scala use different api to
+			//get input element
+			if (tempNode.getPact().equals("Data Source")) {
+				content = "collect elements with CollectionInputFormat";
+			}
+			pw.print("content : " + content + "\n");
+
+			List<Predecessors> predecessors = tempNode.getPredecessors();
+			if (predecessors != null) {
+				printTab(tabCount + 1, pw);
+				pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");
+
+				String mode = predecessors.get(0).getExchange_mode();
+				if (mode != null) {
+					printTab(tabCount + 1, pw);
+					pw.print("exchange_mode : " + mode + "\n");
+				}
+			}
+
+			if (tempNode.getDriver_strategy() != null) {
+				printTab(tabCount + 1, pw);
+				pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
+			}
+
+			if (tempNode.getGlobal_properties() != null) {
+				printTab(tabCount + 1, pw);
+				pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+					+ tempNode.getGlobal_properties().get(0).getValue() + "\n");
+			}
+
+			if (extended) {
+				List<Global_properties> globalProperties = tempNode.getGlobal_properties();
+				for (int i = 1; i < globalProperties.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(globalProperties.get(i).getName() + " : "
+					+ globalProperties.get(i).getValue() + "\n");
+				}
+
+				List<LocalProperty> localProperties = tempNode.getLocal_properties();
+				for (int i = 0; i < localProperties.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(localProperties.get(i).getName() + " : "
+					+ localProperties.get(i).getValue() + "\n");
+				}
+
+				List<Estimates> estimates = tempNode.getEstimates();
+				for (int i = 0; i < estimates.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(estimates.get(i).getName() + " : "
+					+ estimates.get(i).getValue() + "\n");
+				}
+
+				List<Costs> costs = tempNode.getCosts();
+				for (int i = 0; i < costs.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(costs.get(i).getName() + " : "
+					+ costs.get(i).getValue() + "\n");
+				}
+
+				List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints();
+				for (int i = 0; i < compilerHintses.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(compilerHintses.get(i).getName() + " : "
+					+ compilerHintses.get(i).getValue() + "\n");
+				}
+			}
+			tabCount++;
+			pw.print("\n");
+		}
+		pw.close();
+		return sw.toString();
+	}
+
+	private static void printTab(int tabCount, PrintWriter pw) {
+		for (int i = 0; i < tabCount; i++)
+			pw.print("\t");
+	}
+}
+
+class PlanTree {
+	private List<Node> nodes;
+
+	public List<Node> getNodes() {
+		return nodes;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
deleted file mode 100644
index 0f748c5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.{Table, TableConfig}
-
-/**
-  * The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]]
-  * [[ExecutionEnvironment]].
-  *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataSet]] to a [[Table]]
-  * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataSet]]
-  * - explain the AST and execution plan of a [[Table]]
-  *
-  * @param execEnv The Java batch [[ExecutionEnvironment]] of the TableEnvironment.
-  * @param config The configuration of the TableEnvironment.
-  */
-class BatchTableEnvironment(
-    execEnv: ExecutionEnvironment,
-    config: TableConfig)
-  extends org.apache.flink.api.table.BatchTableEnvironment(execEnv, config) {
-
-  /**
-    * Converts the given [[DataSet]] into a [[Table]].
-    *
-    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
-    *
-    * @param dataSet The [[DataSet]] to be converted.
-    * @tparam T The type of the [[DataSet]].
-    * @return The converted [[Table]].
-    */
-  def fromDataSet[T](dataSet: DataSet[T]): Table = {
-
-    val name = createUniqueTableName()
-    registerDataSetInternal(name, dataSet)
-    scan(name)
-  }
-
-  /**
-    * Converts the given [[DataSet]] into a [[Table]] with specified field names.
-    *
-    * Example:
-    *
-    * {{{
-    *   DataSet<Tuple2<String, Long>> set = ...
-    *   Table tab = tableEnv.fromDataSet(set, "a, b")
-    * }}}
-    *
-    * @param dataSet The [[DataSet]] to be converted.
-    * @param fields The field names of the resulting [[Table]].
-    * @tparam T The type of the [[DataSet]].
-    * @return The converted [[Table]].
-    */
-  def fromDataSet[T](dataSet: DataSet[T], fields: String): Table = {
-    val exprs = ExpressionParser
-      .parseExpressionList(fields)
-      .toArray
-
-    val name = createUniqueTableName()
-    registerDataSetInternal(name, dataSet, exprs)
-    scan(name)
-  }
-
-  /**
-    * Registers the given [[DataSet]] as table in the
-    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
-    *
-    * @param name The name under which the [[DataSet]] is registered in the catalog.
-    * @param dataSet The [[DataSet]] to register.
-    * @tparam T The type of the [[DataSet]] to register.
-    */
-  def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
-
-    checkValidTableName(name)
-    registerDataSetInternal(name, dataSet)
-  }
-
-  /**
-    * Registers the given [[DataSet]] as table with specified field names in the
-    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * Example:
-    *
-    * {{{
-    *   DataSet<Tuple2<String, Long>> set = ...
-    *   tableEnv.registerDataSet("myTable", set, "a, b")
-    * }}}
-    *
-    * @param name The name under which the [[DataSet]] is registered in the catalog.
-    * @param dataSet The [[DataSet]] to register.
-    * @param fields The field names of the registered table.
-    * @tparam T The type of the [[DataSet]] to register.
-    */
-  def registerDataSet[T](name: String, dataSet: DataSet[T], fields: String): Unit = {
-    val exprs = ExpressionParser
-      .parseExpressionList(fields)
-      .toArray
-
-    checkValidTableName(name)
-    registerDataSetInternal(name, dataSet, exprs)
-  }
-
-  /**
-    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
-    *
-    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
-    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
-    *
-    * @param table The [[Table]] to convert.
-    * @param clazz The class of the type of the resulting [[DataSet]].
-    * @tparam T The type of the resulting [[DataSet]].
-    * @return The converted [[DataSet]].
-    */
-  def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
-    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
-  }
-
-  /**
-    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
-    *
-    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
-    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
-    *
-    * @param table The [[Table]] to convert.
-    * @param typeInfo The [[TypeInformation]] that specifies the type of the resulting [[DataSet]].
-    * @tparam T The type of the resulting [[DataSet]].
-    * @return The converted [[DataSet]].
-    */
-  def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
-    translate[T](table)(typeInfo)
-  }
-
-  /**
-    * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
-    * Registered functions can be referenced in Table API and SQL queries.
-    *
-    * @param name The name under which the function is registered.
-    * @param tf The TableFunction to register.
-    * @tparam T The type of the output row.
-    */
-  def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
-    implicit val typeInfo: TypeInformation[T] = TypeExtractor
-      .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0)
-      .asInstanceOf[TypeInformation[T]]
-
-    registerTableFunctionInternal[T](name, tf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
deleted file mode 100644
index 3218ced..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.{TableConfig, Table}
-import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-/**
-  * The [[org.apache.flink.api.table.TableEnvironment]] for a Java [[StreamExecutionEnvironment]].
-  *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataStream]] to a [[Table]]
-  * - register a [[DataStream]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataStream]]
-  * - explain the AST and execution plan of a [[Table]]
-  *
-  * @param execEnv The Java [[StreamExecutionEnvironment]] of the TableEnvironment.
-  * @param config The configuration of the TableEnvironment.
-  */
-class StreamTableEnvironment(
-    execEnv: StreamExecutionEnvironment,
-    config: TableConfig)
-  extends org.apache.flink.api.table.StreamTableEnvironment(execEnv, config) {
-
-  /**
-    * Converts the given [[DataStream]] into a [[Table]].
-    *
-    * The field names of the [[Table]] are automatically derived from the type of the
-    * [[DataStream]].
-    *
-    * @param dataStream The [[DataStream]] to be converted.
-    * @tparam T The type of the [[DataStream]].
-    * @return The converted [[Table]].
-    */
-  def fromDataStream[T](dataStream: DataStream[T]): Table = {
-
-    val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream)
-    ingest(name)
-  }
-
-  /**
-    * Converts the given [[DataStream]] into a [[Table]] with specified field names.
-    *
-    * Example:
-    *
-    * {{{
-    *   DataStream<Tuple2<String, Long>> stream = ...
-    *   Table tab = tableEnv.fromDataStream(stream, "a, b")
-    * }}}
-    *
-    * @param dataStream The [[DataStream]] to be converted.
-    * @param fields The field names of the resulting [[Table]].
-    * @tparam T The type of the [[DataStream]].
-    * @return The converted [[Table]].
-    */
-  def fromDataStream[T](dataStream: DataStream[T], fields: String): Table = {
-    val exprs = ExpressionParser
-      .parseExpressionList(fields)
-      .toArray
-
-    val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream, exprs)
-    ingest(name)
-  }
-
-  /**
-    * Registers the given [[DataStream]] as table in the
-    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * The field names of the [[Table]] are automatically derived
-    * from the type of the [[DataStream]].
-    *
-    * @param name The name under which the [[DataStream]] is registered in the catalog.
-    * @param dataStream The [[DataStream]] to register.
-    * @tparam T The type of the [[DataStream]] to register.
-    */
-  def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
-
-    checkValidTableName(name)
-    registerDataStreamInternal(name, dataStream)
-  }
-
-  /**
-    * Registers the given [[DataStream]] as table with specified field names in the
-    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * Example:
-    *
-    * {{{
-    *   DataStream<Tuple2<String, Long>> set = ...
-    *   tableEnv.registerDataStream("myTable", set, "a, b")
-    * }}}
-    *
-    * @param name The name under which the [[DataStream]] is registered in the catalog.
-    * @param dataStream The [[DataStream]] to register.
-    * @param fields The field names of the registered table.
-    * @tparam T The type of the [[DataStream]] to register.
-    */
-  def registerDataStream[T](name: String, dataStream: DataStream[T], fields: String): Unit = {
-    val exprs = ExpressionParser
-      .parseExpressionList(fields)
-      .toArray
-
-    checkValidTableName(name)
-    registerDataStreamInternal(name, dataStream, exprs)
-  }
-
-  /**
-    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
-    *
-    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
-    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
-    *
-    * @param table The [[Table]] to convert.
-    * @param clazz The class of the type of the resulting [[DataStream]].
-    * @tparam T The type of the resulting [[DataStream]].
-    * @return The converted [[DataStream]].
-    */
-  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
-    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
-  }
-
-  /**
-    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
-    *
-    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
-    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
-    *
-    * @param table The [[Table]] to convert.
-    * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
-    * @tparam T The type of the resulting [[DataStream]].
-    * @return The converted [[DataStream]].
-    */
-  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
-    translate[T](table)(typeInfo)
-  }
-
-  /**
-    * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
-    * Registered functions can be referenced in Table API and SQL queries.
-    *
-    * @param name The name under which the function is registered.
-    * @param tf The TableFunction to register.
-    * @tparam T The type of the output row.
-    */
-  def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
-    implicit val typeInfo: TypeInformation[T] = TypeExtractor
-      .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0)
-      .asInstanceOf[TypeInformation[T]]
-
-    registerTableFunctionInternal[T](name, tf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
deleted file mode 100644
index 3bbe753..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table
-
-import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow}
-
-/**
-  * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
-  * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
-  * elements in 5 minutes intervals.
-  */
-object Tumble {
-
-  /**
-    * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping
-    * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
-    * elements in 5 minutes intervals.
-    *
-    * @param size the size of the window as time or row-count interval.
-    * @return a tumbling window
-    */
-  def over(size: String): TumblingWindow = new TumblingWindow(size)
-}
-
-/**
-  * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by
-  * a specified slide interval. If the slide interval is smaller than the window size, sliding
-  * windows are overlapping. Thus, an element can be assigned to multiple windows.
-  *
-  * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
-  * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
-  * window evaluations.
-  */
-object Slide {
-
-  /**
-    * Creates a sliding window. Sliding windows have a fixed size and slide by
-    * a specified slide interval. If the slide interval is smaller than the window size, sliding
-    * windows are overlapping. Thus, an element can be assigned to multiple windows.
-    *
-    * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
-    * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
-    * consecutive window evaluations.
-    *
-    * @param size the size of the window as time or row-count interval
-    * @return a partially specified sliding window
-    */
-  def over(size: String): SlideWithSize = new SlideWithSize(size)
-}
-
-/**
-  * Helper class for creating a session window. The boundary of session windows are defined by
-  * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
-  * gap period.
-  */
-object Session {
-
-  /**
-    * Creates a session window. The boundary of session windows are defined by
-    * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
-    * gap period.
-    *
-    * @param gap specifies how long (as interval of milliseconds) to wait for new data before
-    *            closing the session window.
-    * @return a session window
-    */
-  def withGap(gap: String): SessionWindow = new SessionWindow(gap)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
deleted file mode 100644
index 26fe51e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.{TableConfig, Table}
-
-import scala.reflect.ClassTag
-
-/**
-  * The [[org.apache.flink.api.table.TableEnvironment]] for a Scala batch [[DataSet]]
-  * [[ExecutionEnvironment]].
-  *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataSet]] to a [[Table]]
-  * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataSet]]
-  * - explain the AST and execution plan of a [[Table]]
-  *
-  * @param execEnv The Scala batch [[ExecutionEnvironment]] of the TableEnvironment.
-  * @param config The configuration of the TableEnvironment.
-  */
-class BatchTableEnvironment(
-    execEnv: ExecutionEnvironment,
-    config: TableConfig)
-  extends org.apache.flink.api.table.BatchTableEnvironment(execEnv.getJavaEnv, config) {
-
-  /**
-    * Converts the given [[DataSet]] into a [[Table]].
-    *
-    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
-    *
-    * @param dataSet The [[DataSet]] to be converted.
-    * @tparam T The type of the [[DataSet]].
-    * @return The converted [[Table]].
-    */
-  def fromDataSet[T](dataSet: DataSet[T]): Table = {
-
-    val name = createUniqueTableName()
-    registerDataSetInternal(name, dataSet.javaSet)
-    scan(name)
-  }
-
-  /**
-    * Converts the given [[DataSet]] into a [[Table]] with specified field names.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataSet[(String, Long)] = ...
-    *   val tab: Table = tableEnv.fromDataSet(set, 'a, 'b)
-    * }}}
-    *
-    * @param dataSet The [[DataSet]] to be converted.
-    * @param fields The field names of the resulting [[Table]].
-    * @tparam T The type of the [[DataSet]].
-    * @return The converted [[Table]].
-    */
-  def fromDataSet[T](dataSet: DataSet[T], fields: Expression*): Table = {
-
-    val name = createUniqueTableName()
-    registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
-    scan(name)
-  }
-
-  /**
-    * Registers the given [[DataSet]] as table in the
-    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
-    *
-    * @param name The name under which the [[DataSet]] is registered in the catalog.
-    * @param dataSet The [[DataSet]] to register.
-    * @tparam T The type of the [[DataSet]] to register.
-    */
-  def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
-
-    checkValidTableName(name)
-    registerDataSetInternal(name, dataSet.javaSet)
-  }
-
-  /**
-    * Registers the given [[DataSet]] as table with specified field names in the
-    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataSet[(String, Long)] = ...
-    *   tableEnv.registerDataSet("myTable", set, 'a, 'b)
-    * }}}
-    *
-    * @param name The name under which the [[DataSet]] is registered in the catalog.
-    * @param dataSet The [[DataSet]] to register.
-    * @param fields The field names of the registered table.
-    * @tparam T The type of the [[DataSet]] to register.
-    */
-  def registerDataSet[T](name: String, dataSet: DataSet[T], fields: Expression*): Unit = {
-
-    checkValidTableName(name)
-    registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
-  }
-
-  /**
-    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
-    *
-    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
-    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
-    *
-    * @param table The [[Table]] to convert.
-    * @tparam T The type of the resulting [[DataSet]].
-    * @return The converted [[DataSet]].
-    */
-  def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
-    wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
-  }
-
-  /**
-    * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
-    * Registered functions can be referenced in Table API and SQL queries.
-    *
-    * @param name The name under which the function is registered.
-    * @param tf The TableFunction to register.
-    * @tparam T The type of the output row.
-    */
-  def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
-    registerTableFunctionInternal(name, tf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
deleted file mode 100644
index ce437c3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.Expression
-
-/**
-  * Holds methods to convert a [[DataSet]] into a [[Table]].
-  *
-  * @param dataSet The [[DataSet]] to convert.
-  * @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
-  * @tparam T The type of the [[DataSet]].
-  */
-class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) {
-
-  /**
-    * Converts the [[DataSet]] into a [[Table]].
-    *
-    * The field name of the new [[Table]] can be specified like this:
-    *
-    * {{{
-    *   val env = ExecutionEnvironment.getExecutionEnvironment
-    *   val tEnv = TableEnvironment.getTableEnvironment(env)
-    *
-    *   val set: DataSet[(String, Int)] = ...
-    *   val table = set.toTable(tEnv, 'name, 'amount)
-    * }}}
-    *
-    * If not explicitly specified, field names are automatically extracted from the type of
-    * the [[DataSet]].
-    *
-    * @param tableEnv The [[BatchTableEnvironment]] in which the new [[Table]] is created.
-    * @param fields The field names of the new [[Table]] (optional).
-    * @return The resulting [[Table]].
-    */
-  def toTable(tableEnv: BatchTableEnvironment, fields: Expression*): Table = {
-    if (fields.isEmpty) {
-      tableEnv.fromDataSet(dataSet)
-    } else {
-      tableEnv.fromDataSet(dataSet, fields: _*)
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
deleted file mode 100644
index 3b724cf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.streaming.api.scala.DataStream
-
-/**
-  * Holds methods to convert a [[DataStream]] into a [[Table]].
-  *
-  * @param dataStream The [[DataStream]] to convert.
-  * @param inputType The [[TypeInformation]] for the type of the [[DataStream]].
-  * @tparam T The type of the [[DataStream]].
-  */
-class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInformation[T]) {
-
-  /**
-    * Converts the [[DataStream]] into a [[Table]].
-    *
-    * The field name of the new [[Table]] can be specified like this:
-    *
-    * {{{
-    *   val env = StreamExecutionEnvironment.getExecutionEnvironment
-    *   val tEnv = TableEnvironment.getTableEnvironment(env)
-    *
-    *   val stream: DataStream[(String, Int)] = ...
-    *   val table = stream.toTable(tEnv, 'name, 'amount)
-    * }}}
-    *
-    * If not explicitly specified, field names are automatically extracted from the type of
-    * the [[DataStream]].
-    *
-    * @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created.
-    * @param fields The field names of the new [[Table]] (optional).
-    * @return The resulting [[Table]].
-    */
-  def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = {
-    if (fields.isEmpty) {
-      tableEnv.fromDataStream(dataStream)
-    } else {
-      tableEnv.fromDataStream(dataStream, fields:_*)
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
deleted file mode 100644
index 044ace8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.{TableConfig, Table}
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream}
-
-import org.apache.flink.streaming.api.scala.asScalaStream
-
-/**
-  * The [[org.apache.flink.api.table.TableEnvironment]] for a Scala [[StreamExecutionEnvironment]].
-  *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataStream]] to a [[Table]]
-  * - register a [[DataStream]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataStream]]
-  * - explain the AST and execution plan of a [[Table]]
-  *
-  * @param execEnv The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
-  * @param config The configuration of the TableEnvironment.
-  */
-class StreamTableEnvironment(
-    execEnv: StreamExecutionEnvironment,
-    config: TableConfig)
-  extends org.apache.flink.api.table.StreamTableEnvironment(
-    execEnv.getWrappedStreamExecutionEnvironment,
-    config) {
-
-  /**
-    * Converts the given [[DataStream]] into a [[Table]].
-    *
-    * The field names of the [[Table]] are automatically derived from the type of the
-    * [[DataStream]].
-    *
-    * @param dataStream The [[DataStream]] to be converted.
-    * @tparam T The type of the [[DataStream]].
-    * @return The converted [[Table]].
-    */
-  def fromDataStream[T](dataStream: DataStream[T]): Table = {
-
-    val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream.javaStream)
-    ingest(name)
-  }
-
-  /**
-    * Converts the given [[DataStream]] into a [[Table]] with specified field names.
-    *
-    * Example:
-    *
-    * {{{
-    *   val stream: DataStream[(String, Long)] = ...
-    *   val tab: Table = tableEnv.fromDataStream(stream, 'a, 'b)
-    * }}}
-    *
-    * @param dataStream The [[DataStream]] to be converted.
-    * @param fields The field names of the resulting [[Table]].
-    * @tparam T The type of the [[DataStream]].
-    * @return The converted [[Table]].
-    */
-  def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
-
-    val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
-    ingest(name)
-  }
-
-  /**
-    * Registers the given [[DataStream]] as table in the
-    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * The field names of the [[Table]] are automatically derived
-    * from the type of the [[DataStream]].
-    *
-    * @param name The name under which the [[DataStream]] is registered in the catalog.
-    * @param dataStream The [[DataStream]] to register.
-    * @tparam T The type of the [[DataStream]] to register.
-    */
-  def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
-
-    checkValidTableName(name)
-    registerDataStreamInternal(name, dataStream.javaStream)
-  }
-
-  /**
-    * Registers the given [[DataStream]] as table with specified field names in the
-    * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataStream[(String, Long)] = ...
-    *   tableEnv.registerDataStream("myTable", set, 'a, 'b)
-    * }}}
-    *
-    * @param name The name under which the [[DataStream]] is registered in the catalog.
-    * @param dataStream The [[DataStream]] to register.
-    * @param fields The field names of the registered table.
-    * @tparam T The type of the [[DataStream]] to register.
-    */
-  def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit = {
-
-    checkValidTableName(name)
-    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
-  }
-
-  /**
-    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
-    *
-    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
-    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
-    *
-    * @param table The [[Table]] to convert.
-    * @tparam T The type of the resulting [[DataStream]].
-    * @return The converted [[DataStream]].
-    */
-  def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
-    asScalaStream(translate(table))
-  }
-
-  /**
-    * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
-    * Registered functions can be referenced in SQL queries.
-    *
-    * @param name The name under which the function is registered.
-    * @param tf The TableFunction to register
-    */
-  def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
-    registerTableFunctionInternal(name, tf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
deleted file mode 100644
index 720dac0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.DataStream
-
-import org.apache.flink.api.table.{Table, TableException}
-import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv}
-import org.apache.flink.api.scala.table.{StreamTableEnvironment => ScalaStreamTableEnv}
-
-/**
-  * Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]].
-  *
-  * @param table The table to convert.
-  */
-class TableConversions(table: Table) {
-
-  /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
-  def toDataSet[T: TypeInformation]: DataSet[T] = {
-
-    table.tableEnv match {
-      case tEnv: ScalaBatchTableEnv =>
-        tEnv.toDataSet(table)
-      case _ =>
-        throw new TableException(
-          "Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
-    }
-  }
-
-  /** Converts the [[Table]] to a [[DataStream]] of the specified type. */
-  def toDataStream[T: TypeInformation]: DataStream[T] = {
-
-    table.tableEnv match {
-      case tEnv: ScalaStreamTableEnv =>
-        tEnv.toDataStream(table)
-      case _ =>
-        throw new TableException(
-          "Only tables that originate from Scala DataStreams " +
-            "can be converted to Scala DataStreams.")
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
deleted file mode 100644
index 823458a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ /dev/null
@@ -1,691 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.calcite.avatica.util.DateTimeUtils._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
-import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
-import org.apache.flink.api.table.expressions._
-import java.math.{BigDecimal => JBigDecimal}
-
-import scala.language.implicitConversions
-
-/**
- * These are all the operations that can be used to construct an [[Expression]] AST for expression
- * operations.
- *
- * These operations must be kept in sync with the parser in
- * [[org.apache.flink.api.table.expressions.ExpressionParser]].
- */
-trait ImplicitExpressionOperations {
-  private[flink] def expr: Expression
-
-  /**
-    * Enables literals on left side of binary expressions.
-    *
-    * e.g. 12.toExpr % 'a
-    *
-    * @return expression
-    */
-  def toExpr: Expression = expr
-
-  def && (other: Expression) = And(expr, other)
-  def || (other: Expression) = Or(expr, other)
-
-  def > (other: Expression) = GreaterThan(expr, other)
-  def >= (other: Expression) = GreaterThanOrEqual(expr, other)
-  def < (other: Expression) = LessThan(expr, other)
-  def <= (other: Expression) = LessThanOrEqual(expr, other)
-
-  def === (other: Expression) = EqualTo(expr, other)
-  def !== (other: Expression) = NotEqualTo(expr, other)
-
-  def unary_! = Not(expr)
-  def unary_- = UnaryMinus(expr)
-
-  def isNull = IsNull(expr)
-  def isNotNull = IsNotNull(expr)
-
-  /**
-    * Returns true if given boolean expression is true. False otherwise (for null and false).
-    */
-  def isTrue = IsTrue(expr)
-
-  /**
-    * Returns true if given boolean expression is false. False otherwise (for null and true).
-    */
-  def isFalse = IsFalse(expr)
-
-  /**
-    * Returns true if given boolean expression is not true (for null and false). False otherwise.
-    */
-  def isNotTrue = IsNotTrue(expr)
-
-  /**
-    * Returns true if given boolean expression is not false (for null and true). False otherwise.
-    */
-  def isNotFalse = IsNotFalse(expr)
-
-  def + (other: Expression) = Plus(expr, other)
-  def - (other: Expression) = Minus(expr, other)
-  def / (other: Expression) = Div(expr, other)
-  def * (other: Expression) = Mul(expr, other)
-  def % (other: Expression) = mod(other)
-
-  def sum = Sum(expr)
-  def min = Min(expr)
-  def max = Max(expr)
-  def count = Count(expr)
-  def avg = Avg(expr)
-
-  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
-
-  /**
-    * Specifies a name for an expression i.e. a field.
-    *
-    * @param name name for one field
-    * @param extraNames additional names if the expression expands to multiple fields
-    * @return field with an alias
-    */
-  def as(name: Symbol, extraNames: Symbol*) = Alias(expr, name.name, extraNames.map(_.name))
-
-  def asc = Asc(expr)
-  def desc = Desc(expr)
-
-  /**
-    * Returns the start time of a window when applied on a window reference.
-    */
-  def start = WindowStart(expr)
-
-  /**
-    * Returns the end time of a window when applied on a window reference.
-    */
-  def end = WindowEnd(expr)
-
-  /**
-    * Ternary conditional operator that decides which of two other expressions should be evaluated
-    * based on a evaluated boolean condition.
-    *
-    * e.g. (42 > 5).?("A", "B") leads to "A"
-    *
-    * @param ifTrue expression to be evaluated if condition holds
-    * @param ifFalse expression to be evaluated if condition does not hold
-    */
-  def ?(ifTrue: Expression, ifFalse: Expression) = {
-    If(expr, ifTrue, ifFalse)
-  }
-
-  // scalar functions
-
-  /**
-    * Calculates the remainder of division the given number by another one.
-    */
-  def mod(other: Expression) = Mod(expr, other)
-
-  /**
-    * Calculates the Euler's number raised to the given power.
-    */
-  def exp() = Exp(expr)
-
-  /**
-    * Calculates the base 10 logarithm of given value.
-    */
-  def log10() = Log10(expr)
-
-  /**
-    * Calculates the natural logarithm of given value.
-    */
-  def ln() = Ln(expr)
-
-  /**
-    * Calculates the given number raised to the power of the other value.
-    */
-  def power(other: Expression) = Power(expr, other)
-
-  /**
-    * Calculates the square root of a given value.
-    */
-  def sqrt() = Sqrt(expr)
-
-  /**
-    * Calculates the absolute value of given value.
-    */
-  def abs() = Abs(expr)
-
-  /**
-    * Calculates the largest integer less than or equal to a given number.
-    */
-  def floor() = Floor(expr)
-
-  /**
-    * Calculates the smallest integer greater than or equal to a given number.
-    */
-  def ceil() = Ceil(expr)
-
-  // String operations
-
-  /**
-    * Creates a substring of the given string at given index for a given length.
-    *
-    * @param beginIndex first character of the substring (starting at 1, inclusive)
-    * @param length number of characters of the substring
-    * @return substring
-    */
-  def substring(beginIndex: Expression, length: Expression) =
-    Substring(expr, beginIndex, length)
-
-  /**
-    * Creates a substring of the given string beginning at the given index to the end.
-    *
-    * @param beginIndex first character of the substring (starting at 1, inclusive)
-    * @return substring
-    */
-  def substring(beginIndex: Expression) =
-    new Substring(expr, beginIndex)
-
-  /**
-    * Removes leading and/or trailing characters from the given string.
-    *
-    * @param removeLeading if true, remove leading characters (default: true)
-    * @param removeTrailing if true, remove trailing characters (default: true)
-    * @param character string containing the character (default: " ")
-    * @return trimmed string
-    */
-  def trim(
-      removeLeading: Boolean = true,
-      removeTrailing: Boolean = true,
-      character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = {
-    if (removeLeading && removeTrailing) {
-      Trim(TrimMode.BOTH, character, expr)
-    } else if (removeLeading) {
-      Trim(TrimMode.LEADING, character, expr)
-    } else if (removeTrailing) {
-      Trim(TrimMode.TRAILING, character, expr)
-    } else {
-      expr
-    }
-  }
-
-  /**
-    * Returns the length of a string.
-    */
-  def charLength() = CharLength(expr)
-
-  /**
-    * Returns all of the characters in a string in upper case using the rules of
-    * the default locale.
-    */
-  def upperCase() = Upper(expr)
-
-  /**
-    * Returns all of the characters in a string in lower case using the rules of
-    * the default locale.
-    */
-  def lowerCase() = Lower(expr)
-
-  /**
-    * Converts the initial letter of each word in a string to uppercase.
-    * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
-    */
-  def initCap() = InitCap(expr)
-
-  /**
-    * Returns true, if a string matches the specified LIKE pattern.
-    *
-    * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
-    */
-  def like(pattern: Expression) = Like(expr, pattern)
-
-  /**
-    * Returns true, if a string matches the specified SQL regex pattern.
-    *
-    * e.g. "A+" matches all strings that consist of at least one A
-    */
-  def similar(pattern: Expression) = Similar(expr, pattern)
-
-  /**
-    * Returns the position of string in an other string starting at 1.
-    * Returns 0 if string could not be found.
-    *
-    * e.g. "a".position("bbbbba") leads to 6
-    */
-  def position(haystack: Expression) = Position(expr, haystack)
-
-  /**
-    * Replaces a substring of string with a string starting at a position (starting at 1).
-    *
-    * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
-    */
-  def overlay(newString: Expression, starting: Expression) = new Overlay(expr, newString, starting)
-
-  /**
-    * Replaces a substring of string with a string starting at a position (starting at 1).
-    * The length specifies how many characters should be removed.
-    *
-    * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
-    */
-  def overlay(newString: Expression, starting: Expression, length: Expression) =
-    Overlay(expr, newString, starting, length)
-
-  // Temporal operations
-
-  /**
-    * Parses a date string in the form "yy-mm-dd" to a SQL Date.
-    */
-  def toDate = Cast(expr, SqlTimeTypeInfo.DATE)
-
-  /**
-    * Parses a time string in the form "hh:mm:ss" to a SQL Time.
-    */
-  def toTime = Cast(expr, SqlTimeTypeInfo.TIME)
-
-  /**
-    * Parses a timestamp string in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp.
-    */
-  def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
-
-  /**
-    * Extracts parts of a time point or time interval. Returns the part as a long value.
-    *
-    * e.g. "2006-06-05".toDate.extract(DAY) leads to 5
-    */
-  def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
-
-  /**
-    * Returns the quarter of a year from a SQL date.
-    *
-    * e.g. "1994-09-27".toDate.quarter() leads to 3
-    */
-  def quarter() = Quarter(expr)
-
-  /**
-    * Rounds down a time point to the given unit.
-    *
-    * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
-    */
-  def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr)
-
-  /**
-    * Rounds up a time point to the given unit.
-    *
-    * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
-    */
-  def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr)
-
-  // Interval types
-
-  /**
-    * Creates an interval of the given number of years.
-    *
-    * @return interval of months
-    */
-  def year = toMonthInterval(expr, 12)
-
-  /**
-    * Creates an interval of the given number of years.
-    *
-    * @return interval of months
-    */
-  def years = year
-
-  /**
-    * Creates an interval of the given number of months.
-    *
-    * @return interval of months
-    */
-  def month = toMonthInterval(expr, 1)
-
-  /**
-    * Creates an interval of the given number of months.
-    *
-    * @return interval of months
-    */
-  def months = month
-
-  /**
-    * Creates an interval of the given number of days.
-    *
-    * @return interval of milliseconds
-    */
-  def day = toMilliInterval(expr, MILLIS_PER_DAY)
-
-  /**
-    * Creates an interval of the given number of days.
-    *
-    * @return interval of milliseconds
-    */
-  def days = day
-
-  /**
-    * Creates an interval of the given number of hours.
-    *
-    * @return interval of milliseconds
-    */
-  def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
-
-  /**
-    * Creates an interval of the given number of hours.
-    *
-    * @return interval of milliseconds
-    */
-  def hours = hour
-
-  /**
-    * Creates an interval of the given number of minutes.
-    *
-    * @return interval of milliseconds
-    */
-  def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
-
-  /**
-    * Creates an interval of the given number of minutes.
-    *
-    * @return interval of milliseconds
-    */
-  def minutes = minute
-
-  /**
-    * Creates an interval of the given number of seconds.
-    *
-    * @return interval of milliseconds
-    */
-  def second = toMilliInterval(expr, MILLIS_PER_SECOND)
-
-  /**
-    * Creates an interval of the given number of seconds.
-    *
-    * @return interval of milliseconds
-    */
-  def seconds = second
-
-  /**
-    * Creates an interval of the given number of milliseconds.
-    *
-    * @return interval of milliseconds
-    */
-  def milli = toMilliInterval(expr, 1)
-
-  /**
-    * Creates an interval of the given number of milliseconds.
-    *
-    * @return interval of milliseconds
-    */
-  def millis = milli
-
-  // row interval type
-
-  /**
-    * Creates an interval of rows.
-    *
-    * @return interval of rows
-    */
-  def rows = toRowInterval(expr)
-
-  /**
-    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
-    * returns it's value.
-    *
-    * @param name name of the field (similar to Flink's field expressions)
-    * @return value of the field
-    */
-  def get(name: String) = GetCompositeField(expr, name)
-
-  /**
-    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
-    * returns it's value.
-    *
-    * @param index position of the field
-    * @return value of the field
-    */
-  def get(index: Int) = GetCompositeField(expr, index)
-
-  /**
-    * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
-    * into a flat representation where every subtype is a separate field.
-    */
-  def flatten() = Flattening(expr)
-
-  /**
-    * Accesses the element of an array based on an index (starting at 1).
-    *
-    * @param index position of the element (starting at 1)
-    * @return value of the element
-    */
-  def at(index: Expression) = ArrayElementAt(expr, index)
-
-  /**
-    * Returns the number of elements of an array.
-    *
-    * @return number of elements
-    */
-  def cardinality() = ArrayCardinality(expr)
-
-  /**
-    * Returns the sole element of an array with a single element. Returns null if the array is
-    * empty. Throws an exception if the array has more than one element.
-    *
-    * @return the first and only element of an array with a single element
-    */
-  def element() = ArrayElement(expr)
-}
-
-/**
- * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]]
- * to [[ImplicitExpressionOperations]].
- */
-trait ImplicitExpressionConversions {
-  implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
-    def expr = e
-  }
-
-  implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations {
-    def expr = UnresolvedFieldReference(s.name)
-  }
-
-  implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations {
-    def expr = Literal(l)
-  }
-
-  implicit class LiteralByteExpression(b: Byte) extends ImplicitExpressionOperations {
-    def expr = Literal(b)
-  }
-
-  implicit class LiteralShortExpression(s: Short) extends ImplicitExpressionOperations {
-    def expr = Literal(s)
-  }
-
-  implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations {
-    def expr = Literal(i)
-  }
-
-  implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations {
-    def expr = Literal(f)
-  }
-
-  implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations {
-    def expr = Literal(d)
-  }
-
-  implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations {
-    def expr = Literal(str)
-  }
-
-  implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations {
-    def expr = Literal(bool)
-  }
-
-  implicit class LiteralJavaDecimalExpression(javaDecimal: java.math.BigDecimal)
-      extends ImplicitExpressionOperations {
-    def expr = Literal(javaDecimal)
-  }
-
-  implicit class LiteralScalaDecimalExpression(scalaDecimal: scala.math.BigDecimal)
-      extends ImplicitExpressionOperations {
-    def expr = Literal(scalaDecimal.bigDecimal)
-  }
-
-  implicit class LiteralSqlDateExpression(sqlDate: Date) extends ImplicitExpressionOperations {
-    def expr = Literal(sqlDate)
-  }
-
-  implicit class LiteralSqlTimeExpression(sqlTime: Time) extends ImplicitExpressionOperations {
-    def expr = Literal(sqlTime)
-  }
-
-  implicit class LiteralSqlTimestampExpression(sqlTimestamp: Timestamp)
-      extends ImplicitExpressionOperations {
-    def expr = Literal(sqlTimestamp)
-  }
-
-  implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name)
-  implicit def byte2Literal(b: Byte): Expression = Literal(b)
-  implicit def short2Literal(s: Short): Expression = Literal(s)
-  implicit def int2Literal(i: Int): Expression = Literal(i)
-  implicit def long2Literal(l: Long): Expression = Literal(l)
-  implicit def double2Literal(d: Double): Expression = Literal(d)
-  implicit def float2Literal(d: Float): Expression = Literal(d)
-  implicit def string2Literal(str: String): Expression = Literal(str)
-  implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
-  implicit def javaDec2Literal(javaDec: JBigDecimal): Expression = Literal(javaDec)
-  implicit def scalaDec2Literal(scalaDec: BigDecimal): Expression =
-    Literal(scalaDec.bigDecimal)
-  implicit def sqlDate2Literal(sqlDate: Date): Expression = Literal(sqlDate)
-  implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
-  implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression =
-    Literal(sqlTimestamp)
-  implicit def array2ArrayConstructor(array: Array[_]): Expression = convertArray(array)
-}
-
-// ------------------------------------------------------------------------------------------------
-// Expressions with no parameters
-// ------------------------------------------------------------------------------------------------
-
-// we disable the object checker here as it checks for capital letters of objects
-// but we want that objects look like functions in certain cases e.g. array(1, 2, 3)
-// scalastyle:off object.name
-
-/**
-  * Returns the current SQL date in UTC time zone.
-  */
-object currentDate {
-
-  /**
-    * Returns the current SQL date in UTC time zone.
-    */
-  def apply(): Expression = {
-    CurrentDate()
-  }
-}
-
-/**
-  * Returns the current SQL time in UTC time zone.
-  */
-object currentTime {
-
-  /**
-    * Returns the current SQL time in UTC time zone.
-    */
-  def apply(): Expression = {
-    CurrentTime()
-  }
-}
-
-/**
-  * Returns the current SQL timestamp in UTC time zone.
-  */
-object currentTimestamp {
-
-  /**
-    * Returns the current SQL timestamp in UTC time zone.
-    */
-  def apply(): Expression = {
-    CurrentTimestamp()
-  }
-}
-
-/**
-  * Returns the current SQL time in local time zone.
-  */
-object localTime {
-
-  /**
-    * Returns the current SQL time in local time zone.
-    */
-  def apply(): Expression = {
-    LocalTime()
-  }
-}
-
-/**
-  * Returns the current SQL timestamp in local time zone.
-  */
-object localTimestamp {
-
-  /**
-    * Returns the current SQL timestamp in local time zone.
-    */
-  def apply(): Expression = {
-    LocalTimestamp()
-  }
-}
-
-/**
-  * Determines whether two anchored time intervals overlap. Time point and temporal are
-  * transformed into a range defined by two time points (start, end). The function
-  * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
-  *
-  * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
-  *
-  * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
-  */
-object temporalOverlaps {
-
-  /**
-    * Determines whether two anchored time intervals overlap. Time point and temporal are
-    * transformed into a range defined by two time points (start, end).
-    *
-    * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
-    *
-    * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
-    */
-  def apply(
-      leftTimePoint: Expression,
-      leftTemporal: Expression,
-      rightTimePoint: Expression,
-      rightTemporal: Expression): Expression = {
-    TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
-  }
-}
-
-/**
-  * Creates an array of literals. The array will be an array of objects (not primitives).
-  */
-object array {
-
-  /**
-    * Creates an array of literals. The array will be an array of objects (not primitives).
-    */
-  def apply(head: Expression, tail: Expression*): Expression = {
-    ArrayConstructor(head +: tail.toSeq)
-  }
-}
-
-// scalastyle:on object.name

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
deleted file mode 100644
index 16fda5b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow}
-
-/**
-  * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
-  * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
-  * elements in 5 minutes intervals.
-  */
-object Tumble {
-
-  /**
-    * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
-    * windows. For example, a tumbling window of 5 minutes size groups
-    * elements in 5 minutes intervals.
-    *
-    * @param size the size of the window as time or row-count interval.
-    * @return a tumbling window
-    */
-  def over(size: Expression): TumblingWindow = new TumblingWindow(size)
-}
-
-/**
-  * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by
-  * a specified slide interval. If the slide interval is smaller than the window size, sliding
-  * windows are overlapping. Thus, an element can be assigned to multiple windows.
-  *
-  * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
-  * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
-  * window evaluations.
-  */
-object Slide {
-
-  /**
-    * Creates a sliding window. Sliding windows have a fixed size and slide by
-    * a specified slide interval. If the slide interval is smaller than the window size, sliding
-    * windows are overlapping. Thus, an element can be assigned to multiple windows.
-    *
-    * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
-    * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
-    * consecutive
-    *
-    * @param size the size of the window as time or row-count interval
-    * @return a partially specified sliding window
-    */
-  def over(size: Expression): SlideWithSize = new SlideWithSize(size)
-}
-
-/**
-  * Helper object for creating a session window. The boundary of session windows are defined by
-  * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
-  * gap period.
-  */
-object Session {
-
-  /**
-    * Creates a session window. The boundary of session windows are defined by
-    * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
-    * gap period.
-    *
-    * @param gap specifies how long (as interval of milliseconds) to wait for new data before
-    *            closing the session window.
-    * @return a session window
-    */
-  def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
deleted file mode 100644
index 1e8bf39..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.Table
-import org.apache.flink.types.Row
-import scala.language.implicitConversions
-import org.apache.flink.streaming.api.scala._
-
-import scala.reflect.ClassTag
-
-/**
-  * == Table API (Scala) ==
-  *
-  * Importing this package with:
-  *
-  * {{{
-  *   import org.apache.flink.api.scala.table._
-  * }}}
-  *
-  * imports implicit conversions for converting a [[DataSet]] and a [[DataStream]] to a
-  * [[Table]]. This can be used to perform SQL-like queries on data. Please have
-  * a look at [[Table]] to see which operations are supported and
-  * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see how an
-  * expression can be specified.
-  *
-  * When writing a query you can use Scala Symbols to refer to field names. One would
-  * refer to field `a` by writing `'a`. Sometimes it is necessary to manually convert a
-  * Scala literal to an Expression literal, in those cases use `Literal`, as in `Literal(3)`.
-  *
-  * Example:
-  *
-  * {{{
-  *   import org.apache.flink.api.scala._
-  *   import org.apache.flink.api.scala.table._
-  *
-  *   val env = ExecutionEnvironment.getExecutionEnvironment
-  *   val tEnv = TableEnvironment.getTableEnvironment(env)
-  *
-  *   val input: DataSet[(String, Int)] = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
-  *   val result = input
-  *         .toTable(tEnv, 'word, 'count)
-  *         .groupBy('word)
-  *         .select('word, 'count.avg)
-  *
-  *   result.print()
-  * }}}
-  *
-  */
-package object table extends ImplicitExpressionConversions {
-
-  implicit def table2TableConversions(table: Table): TableConversions = {
-    new TableConversions(table)
-  }
-
-  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = {
-    new DataSetConversions[T](set, set.getType())
-  }
-
-  implicit def table2RowDataSet(table: Table): DataSet[Row] = {
-    val tableEnv = table.tableEnv.asInstanceOf[BatchTableEnvironment]
-    tableEnv.toDataSet[Row](table)
-  }
-
-  implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
-    new DataStreamConversions[T](set, set.dataType.asInstanceOf[CompositeType[T]])
-  }
-
-  implicit def table2RowDataStream(table: Table): DataStream[Row] = {
-    val tableEnv = table.tableEnv.asInstanceOf[StreamTableEnvironment]
-    tableEnv.toDataStream[Row](table)
-  }
-
-}