You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:10 UTC
[41/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
new file mode 100644
index 0000000..f13c042
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.explain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class PlanJsonParser {
+
+ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ //not every node is same, ignore the unknown field
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ PlanTree tree = objectMapper.readValue(t, PlanTree.class);
+ LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ int tabCount = 0;
+
+ for (int index = 0; index < tree.getNodes().size(); index++) {
+ Node tempNode = tree.getNodes().get(index);
+
+ //input with operation such as join or union is coordinate, keep the same indent
+ if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact()))) {
+ tabCount = map.get(tempNode.getPact());
+ }
+ else {
+ map.put(tempNode.getPact(), tabCount);
+ }
+
+ printTab(tabCount, pw);
+ pw.print("Stage " + tempNode.getId() + " : " + tempNode.getPact() + "\n");
+
+ printTab(tabCount + 1, pw);
+ String content = tempNode.getContents();
+
+ //drop the hashcode of object instance
+ int dele = tempNode.getContents().indexOf("@");
+ if (dele > -1) {
+ content = tempNode.getContents().substring(0, dele);
+ }
+
+ //replace with certain content if node is dataSource to pass
+ //unit tests, because java and scala use different api to
+ //get input element
+ if (tempNode.getPact().equals("Data Source")) {
+ content = "collect elements with CollectionInputFormat";
+ }
+ pw.print("content : " + content + "\n");
+
+ List<Predecessors> predecessors = tempNode.getPredecessors();
+ if (predecessors != null) {
+ printTab(tabCount + 1, pw);
+ pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");
+
+ String mode = predecessors.get(0).getExchange_mode();
+ if (mode != null) {
+ printTab(tabCount + 1, pw);
+ pw.print("exchange_mode : " + mode + "\n");
+ }
+ }
+
+ if (tempNode.getDriver_strategy() != null) {
+ printTab(tabCount + 1, pw);
+ pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
+ }
+
+ if (tempNode.getGlobal_properties() != null) {
+ printTab(tabCount + 1, pw);
+ pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+ + tempNode.getGlobal_properties().get(0).getValue() + "\n");
+ }
+
+ if (extended) {
+ List<Global_properties> globalProperties = tempNode.getGlobal_properties();
+ for (int i = 1; i < globalProperties.size(); i++) {
+ printTab(tabCount + 1, pw);
+ pw.print(globalProperties.get(i).getName() + " : "
+ + globalProperties.get(i).getValue() + "\n");
+ }
+
+ List<LocalProperty> localProperties = tempNode.getLocal_properties();
+ for (int i = 0; i < localProperties.size(); i++) {
+ printTab(tabCount + 1, pw);
+ pw.print(localProperties.get(i).getName() + " : "
+ + localProperties.get(i).getValue() + "\n");
+ }
+
+ List<Estimates> estimates = tempNode.getEstimates();
+ for (int i = 0; i < estimates.size(); i++) {
+ printTab(tabCount + 1, pw);
+ pw.print(estimates.get(i).getName() + " : "
+ + estimates.get(i).getValue() + "\n");
+ }
+
+ List<Costs> costs = tempNode.getCosts();
+ for (int i = 0; i < costs.size(); i++) {
+ printTab(tabCount + 1, pw);
+ pw.print(costs.get(i).getName() + " : "
+ + costs.get(i).getValue() + "\n");
+ }
+
+ List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints();
+ for (int i = 0; i < compilerHintses.size(); i++) {
+ printTab(tabCount + 1, pw);
+ pw.print(compilerHintses.get(i).getName() + " : "
+ + compilerHintses.get(i).getValue() + "\n");
+ }
+ }
+ tabCount++;
+ pw.print("\n");
+ }
+ pw.close();
+ return sw.toString();
+ }
+
+ private static void printTab(int tabCount, PrintWriter pw) {
+ for (int i = 0; i < tabCount; i++)
+ pw.print("\t");
+ }
+}
+
+class PlanTree {
+ private List<Node> nodes;
+
+ public List<Node> getNodes() {
+ return nodes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
deleted file mode 100644
index 0f748c5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.{Table, TableConfig}
-
-/**
- * The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]]
- * [[ExecutionEnvironment]].
- *
- * A TableEnvironment can be used to:
- * - convert a [[DataSet]] to a [[Table]]
- * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - scan a registered table to obtain a [[Table]]
- * - specify a SQL query on registered tables to obtain a [[Table]]
- * - convert a [[Table]] into a [[DataSet]]
- * - explain the AST and execution plan of a [[Table]]
- *
- * @param execEnv The Java batch [[ExecutionEnvironment]] of the TableEnvironment.
- * @param config The configuration of the TableEnvironment.
- */
-class BatchTableEnvironment(
- execEnv: ExecutionEnvironment,
- config: TableConfig)
- extends org.apache.flink.api.table.BatchTableEnvironment(execEnv, config) {
-
- /**
- * Converts the given [[DataSet]] into a [[Table]].
- *
- * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
- *
- * @param dataSet The [[DataSet]] to be converted.
- * @tparam T The type of the [[DataSet]].
- * @return The converted [[Table]].
- */
- def fromDataSet[T](dataSet: DataSet[T]): Table = {
-
- val name = createUniqueTableName()
- registerDataSetInternal(name, dataSet)
- scan(name)
- }
-
- /**
- * Converts the given [[DataSet]] into a [[Table]] with specified field names.
- *
- * Example:
- *
- * {{{
- * DataSet<Tuple2<String, Long>> set = ...
- * Table tab = tableEnv.fromDataSet(set, "a, b")
- * }}}
- *
- * @param dataSet The [[DataSet]] to be converted.
- * @param fields The field names of the resulting [[Table]].
- * @tparam T The type of the [[DataSet]].
- * @return The converted [[Table]].
- */
- def fromDataSet[T](dataSet: DataSet[T], fields: String): Table = {
- val exprs = ExpressionParser
- .parseExpressionList(fields)
- .toArray
-
- val name = createUniqueTableName()
- registerDataSetInternal(name, dataSet, exprs)
- scan(name)
- }
-
- /**
- * Registers the given [[DataSet]] as table in the
- * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
- *
- * @param name The name under which the [[DataSet]] is registered in the catalog.
- * @param dataSet The [[DataSet]] to register.
- * @tparam T The type of the [[DataSet]] to register.
- */
- def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
-
- checkValidTableName(name)
- registerDataSetInternal(name, dataSet)
- }
-
- /**
- * Registers the given [[DataSet]] as table with specified field names in the
- * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * Example:
- *
- * {{{
- * DataSet<Tuple2<String, Long>> set = ...
- * tableEnv.registerDataSet("myTable", set, "a, b")
- * }}}
- *
- * @param name The name under which the [[DataSet]] is registered in the catalog.
- * @param dataSet The [[DataSet]] to register.
- * @param fields The field names of the registered table.
- * @tparam T The type of the [[DataSet]] to register.
- */
- def registerDataSet[T](name: String, dataSet: DataSet[T], fields: String): Unit = {
- val exprs = ExpressionParser
- .parseExpressionList(fields)
- .toArray
-
- checkValidTableName(name)
- registerDataSetInternal(name, dataSet, exprs)
- }
-
- /**
- * Converts the given [[Table]] into a [[DataSet]] of a specified type.
- *
- * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
- * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
- * types: Fields are mapped by position, field types must match.
- * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
- *
- * @param table The [[Table]] to convert.
- * @param clazz The class of the type of the resulting [[DataSet]].
- * @tparam T The type of the resulting [[DataSet]].
- * @return The converted [[DataSet]].
- */
- def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
- translate[T](table)(TypeExtractor.createTypeInfo(clazz))
- }
-
- /**
- * Converts the given [[Table]] into a [[DataSet]] of a specified type.
- *
- * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
- * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
- * types: Fields are mapped by position, field types must match.
- * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
- *
- * @param table The [[Table]] to convert.
- * @param typeInfo The [[TypeInformation]] that specifies the type of the resulting [[DataSet]].
- * @tparam T The type of the resulting [[DataSet]].
- * @return The converted [[DataSet]].
- */
- def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
- translate[T](table)(typeInfo)
- }
-
- /**
- * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
- * Registered functions can be referenced in Table API and SQL queries.
- *
- * @param name The name under which the function is registered.
- * @param tf The TableFunction to register.
- * @tparam T The type of the output row.
- */
- def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
- implicit val typeInfo: TypeInformation[T] = TypeExtractor
- .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0)
- .asInstanceOf[TypeInformation[T]]
-
- registerTableFunctionInternal[T](name, tf)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
deleted file mode 100644
index 3218ced..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.{TableConfig, Table}
-import org.apache.flink.api.table.expressions.ExpressionParser
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-/**
- * The [[org.apache.flink.api.table.TableEnvironment]] for a Java [[StreamExecutionEnvironment]].
- *
- * A TableEnvironment can be used to:
- * - convert a [[DataStream]] to a [[Table]]
- * - register a [[DataStream]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - scan a registered table to obtain a [[Table]]
- * - specify a SQL query on registered tables to obtain a [[Table]]
- * - convert a [[Table]] into a [[DataStream]]
- * - explain the AST and execution plan of a [[Table]]
- *
- * @param execEnv The Java [[StreamExecutionEnvironment]] of the TableEnvironment.
- * @param config The configuration of the TableEnvironment.
- */
-class StreamTableEnvironment(
- execEnv: StreamExecutionEnvironment,
- config: TableConfig)
- extends org.apache.flink.api.table.StreamTableEnvironment(execEnv, config) {
-
- /**
- * Converts the given [[DataStream]] into a [[Table]].
- *
- * The field names of the [[Table]] are automatically derived from the type of the
- * [[DataStream]].
- *
- * @param dataStream The [[DataStream]] to be converted.
- * @tparam T The type of the [[DataStream]].
- * @return The converted [[Table]].
- */
- def fromDataStream[T](dataStream: DataStream[T]): Table = {
-
- val name = createUniqueTableName()
- registerDataStreamInternal(name, dataStream)
- ingest(name)
- }
-
- /**
- * Converts the given [[DataStream]] into a [[Table]] with specified field names.
- *
- * Example:
- *
- * {{{
- * DataStream<Tuple2<String, Long>> stream = ...
- * Table tab = tableEnv.fromDataStream(stream, "a, b")
- * }}}
- *
- * @param dataStream The [[DataStream]] to be converted.
- * @param fields The field names of the resulting [[Table]].
- * @tparam T The type of the [[DataStream]].
- * @return The converted [[Table]].
- */
- def fromDataStream[T](dataStream: DataStream[T], fields: String): Table = {
- val exprs = ExpressionParser
- .parseExpressionList(fields)
- .toArray
-
- val name = createUniqueTableName()
- registerDataStreamInternal(name, dataStream, exprs)
- ingest(name)
- }
-
- /**
- * Registers the given [[DataStream]] as table in the
- * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * The field names of the [[Table]] are automatically derived
- * from the type of the [[DataStream]].
- *
- * @param name The name under which the [[DataStream]] is registered in the catalog.
- * @param dataStream The [[DataStream]] to register.
- * @tparam T The type of the [[DataStream]] to register.
- */
- def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
-
- checkValidTableName(name)
- registerDataStreamInternal(name, dataStream)
- }
-
- /**
- * Registers the given [[DataStream]] as table with specified field names in the
- * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * Example:
- *
- * {{{
- * DataStream<Tuple2<String, Long>> set = ...
- * tableEnv.registerDataStream("myTable", set, "a, b")
- * }}}
- *
- * @param name The name under which the [[DataStream]] is registered in the catalog.
- * @param dataStream The [[DataStream]] to register.
- * @param fields The field names of the registered table.
- * @tparam T The type of the [[DataStream]] to register.
- */
- def registerDataStream[T](name: String, dataStream: DataStream[T], fields: String): Unit = {
- val exprs = ExpressionParser
- .parseExpressionList(fields)
- .toArray
-
- checkValidTableName(name)
- registerDataStreamInternal(name, dataStream, exprs)
- }
-
- /**
- * Converts the given [[Table]] into a [[DataStream]] of a specified type.
- *
- * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
- * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
- * types: Fields are mapped by position, field types must match.
- * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
- *
- * @param table The [[Table]] to convert.
- * @param clazz The class of the type of the resulting [[DataStream]].
- * @tparam T The type of the resulting [[DataStream]].
- * @return The converted [[DataStream]].
- */
- def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
- translate[T](table)(TypeExtractor.createTypeInfo(clazz))
- }
-
- /**
- * Converts the given [[Table]] into a [[DataStream]] of a specified type.
- *
- * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
- * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
- * types: Fields are mapped by position, field types must match.
- * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
- *
- * @param table The [[Table]] to convert.
- * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
- * @tparam T The type of the resulting [[DataStream]].
- * @return The converted [[DataStream]].
- */
- def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
- translate[T](table)(typeInfo)
- }
-
- /**
- * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
- * Registered functions can be referenced in Table API and SQL queries.
- *
- * @param name The name under which the function is registered.
- * @param tf The TableFunction to register.
- * @tparam T The type of the output row.
- */
- def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
- implicit val typeInfo: TypeInformation[T] = TypeExtractor
- .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0)
- .asInstanceOf[TypeInformation[T]]
-
- registerTableFunctionInternal[T](name, tf)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
deleted file mode 100644
index 3bbe753..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/groupWindows.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table
-
-import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow}
-
-/**
- * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
- * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
- * elements in 5 minutes intervals.
- */
-object Tumble {
-
- /**
- * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping
- * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
- * elements in 5 minutes intervals.
- *
- * @param size the size of the window as time or row-count interval.
- * @return a tumbling window
- */
- def over(size: String): TumblingWindow = new TumblingWindow(size)
-}
-
-/**
- * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by
- * a specified slide interval. If the slide interval is smaller than the window size, sliding
- * windows are overlapping. Thus, an element can be assigned to multiple windows.
- *
- * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
- * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
- * window evaluations.
- */
-object Slide {
-
- /**
- * Creates a sliding window. Sliding windows have a fixed size and slide by
- * a specified slide interval. If the slide interval is smaller than the window size, sliding
- * windows are overlapping. Thus, an element can be assigned to multiple windows.
- *
- * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
- * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
- * consecutive window evaluations.
- *
- * @param size the size of the window as time or row-count interval
- * @return a partially specified sliding window
- */
- def over(size: String): SlideWithSize = new SlideWithSize(size)
-}
-
-/**
- * Helper class for creating a session window. The boundary of session windows are defined by
- * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
- * gap period.
- */
-object Session {
-
- /**
- * Creates a session window. The boundary of session windows are defined by
- * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
- * gap period.
- *
- * @param gap specifies how long (as interval of milliseconds) to wait for new data before
- * closing the session window.
- * @return a session window
- */
- def withGap(gap: String): SessionWindow = new SessionWindow(gap)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
deleted file mode 100644
index 26fe51e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.{TableConfig, Table}
-
-import scala.reflect.ClassTag
-
-/**
- * The [[org.apache.flink.api.table.TableEnvironment]] for a Scala batch [[DataSet]]
- * [[ExecutionEnvironment]].
- *
- * A TableEnvironment can be used to:
- * - convert a [[DataSet]] to a [[Table]]
- * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - scan a registered table to obtain a [[Table]]
- * - specify a SQL query on registered tables to obtain a [[Table]]
- * - convert a [[Table]] into a [[DataSet]]
- * - explain the AST and execution plan of a [[Table]]
- *
- * @param execEnv The Scala batch [[ExecutionEnvironment]] of the TableEnvironment.
- * @param config The configuration of the TableEnvironment.
- */
-class BatchTableEnvironment(
- execEnv: ExecutionEnvironment,
- config: TableConfig)
- extends org.apache.flink.api.table.BatchTableEnvironment(execEnv.getJavaEnv, config) {
-
- /**
- * Converts the given [[DataSet]] into a [[Table]].
- *
- * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
- *
- * @param dataSet The [[DataSet]] to be converted.
- * @tparam T The type of the [[DataSet]].
- * @return The converted [[Table]].
- */
- def fromDataSet[T](dataSet: DataSet[T]): Table = {
-
- val name = createUniqueTableName()
- registerDataSetInternal(name, dataSet.javaSet)
- scan(name)
- }
-
- /**
- * Converts the given [[DataSet]] into a [[Table]] with specified field names.
- *
- * Example:
- *
- * {{{
- * val set: DataSet[(String, Long)] = ...
- * val tab: Table = tableEnv.fromDataSet(set, 'a, 'b)
- * }}}
- *
- * @param dataSet The [[DataSet]] to be converted.
- * @param fields The field names of the resulting [[Table]].
- * @tparam T The type of the [[DataSet]].
- * @return The converted [[Table]].
- */
- def fromDataSet[T](dataSet: DataSet[T], fields: Expression*): Table = {
-
- val name = createUniqueTableName()
- registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
- scan(name)
- }
-
- /**
- * Registers the given [[DataSet]] as table in the
- * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
- *
- * @param name The name under which the [[DataSet]] is registered in the catalog.
- * @param dataSet The [[DataSet]] to register.
- * @tparam T The type of the [[DataSet]] to register.
- */
- def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
-
- checkValidTableName(name)
- registerDataSetInternal(name, dataSet.javaSet)
- }
-
- /**
- * Registers the given [[DataSet]] as table with specified field names in the
- * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * Example:
- *
- * {{{
- * val set: DataSet[(String, Long)] = ...
- * tableEnv.registerDataSet("myTable", set, 'a, 'b)
- * }}}
- *
- * @param name The name under which the [[DataSet]] is registered in the catalog.
- * @param dataSet The [[DataSet]] to register.
- * @param fields The field names of the registered table.
- * @tparam T The type of the [[DataSet]] to register.
- */
- def registerDataSet[T](name: String, dataSet: DataSet[T], fields: Expression*): Unit = {
-
- checkValidTableName(name)
- registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
- }
-
- /**
- * Converts the given [[Table]] into a [[DataSet]] of a specified type.
- *
- * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
- * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
- * types: Fields are mapped by position, field types must match.
- * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
- *
- * @param table The [[Table]] to convert.
- * @tparam T The type of the resulting [[DataSet]].
- * @return The converted [[DataSet]].
- */
- def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
- wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
- }
-
- /**
- * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
- * Registered functions can be referenced in Table API and SQL queries.
- *
- * @param name The name under which the function is registered.
- * @param tf The TableFunction to register.
- * @tparam T The type of the output row.
- */
- def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
- registerTableFunctionInternal(name, tf)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
deleted file mode 100644
index ce437c3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.Expression
-
-/**
- * Holds methods to convert a [[DataSet]] into a [[Table]].
- *
- * @param dataSet The [[DataSet]] to convert.
- * @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
- * @tparam T The type of the [[DataSet]].
- */
-class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) {
-
- /**
- * Converts the [[DataSet]] into a [[Table]].
- *
- * The field name of the new [[Table]] can be specified like this:
- *
- * {{{
- * val env = ExecutionEnvironment.getExecutionEnvironment
- * val tEnv = TableEnvironment.getTableEnvironment(env)
- *
- * val set: DataSet[(String, Int)] = ...
- * val table = set.toTable(tEnv, 'name, 'amount)
- * }}}
- *
- * If not explicitly specified, field names are automatically extracted from the type of
- * the [[DataSet]].
- *
- * @param tableEnv The [[BatchTableEnvironment]] in which the new [[Table]] is created.
- * @param fields The field names of the new [[Table]] (optional).
- * @return The resulting [[Table]].
- */
- def toTable(tableEnv: BatchTableEnvironment, fields: Expression*): Table = {
- if (fields.isEmpty) {
- tableEnv.fromDataSet(dataSet)
- } else {
- tableEnv.fromDataSet(dataSet, fields: _*)
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
deleted file mode 100644
index 3b724cf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.streaming.api.scala.DataStream
-
-/**
- * Holds methods to convert a [[DataStream]] into a [[Table]].
- *
- * @param dataStream The [[DataStream]] to convert.
- * @param inputType The [[TypeInformation]] for the type of the [[DataStream]].
- * @tparam T The type of the [[DataStream]].
- */
-class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInformation[T]) {
-
- /**
- * Converts the [[DataStream]] into a [[Table]].
- *
- * The field name of the new [[Table]] can be specified like this:
- *
- * {{{
- * val env = StreamExecutionEnvironment.getExecutionEnvironment
- * val tEnv = TableEnvironment.getTableEnvironment(env)
- *
- * val stream: DataStream[(String, Int)] = ...
- * val table = stream.toTable(tEnv, 'name, 'amount)
- * }}}
- *
- * If not explicitly specified, field names are automatically extracted from the type of
- * the [[DataStream]].
- *
- * @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created.
- * @param fields The field names of the new [[Table]] (optional).
- * @return The resulting [[Table]].
- */
- def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = {
- if (fields.isEmpty) {
- tableEnv.fromDataStream(dataStream)
- } else {
- tableEnv.fromDataStream(dataStream, fields:_*)
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
deleted file mode 100644
index 044ace8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.{TableConfig, Table}
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream}
-
-import org.apache.flink.streaming.api.scala.asScalaStream
-
-/**
- * The [[org.apache.flink.api.table.TableEnvironment]] for a Scala [[StreamExecutionEnvironment]].
- *
- * A TableEnvironment can be used to:
- * - convert a [[DataStream]] to a [[Table]]
- * - register a [[DataStream]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - scan a registered table to obtain a [[Table]]
- * - specify a SQL query on registered tables to obtain a [[Table]]
- * - convert a [[Table]] into a [[DataStream]]
- * - explain the AST and execution plan of a [[Table]]
- *
- * @param execEnv The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
- * @param config The configuration of the TableEnvironment.
- */
-class StreamTableEnvironment(
- execEnv: StreamExecutionEnvironment,
- config: TableConfig)
- extends org.apache.flink.api.table.StreamTableEnvironment(
- execEnv.getWrappedStreamExecutionEnvironment,
- config) {
-
- /**
- * Converts the given [[DataStream]] into a [[Table]].
- *
- * The field names of the [[Table]] are automatically derived from the type of the
- * [[DataStream]].
- *
- * @param dataStream The [[DataStream]] to be converted.
- * @tparam T The type of the [[DataStream]].
- * @return The converted [[Table]].
- */
- def fromDataStream[T](dataStream: DataStream[T]): Table = {
-
- val name = createUniqueTableName()
- registerDataStreamInternal(name, dataStream.javaStream)
- ingest(name)
- }
-
- /**
- * Converts the given [[DataStream]] into a [[Table]] with specified field names.
- *
- * Example:
- *
- * {{{
- * val stream: DataStream[(String, Long)] = ...
- * val tab: Table = tableEnv.fromDataStream(stream, 'a, 'b)
- * }}}
- *
- * @param dataStream The [[DataStream]] to be converted.
- * @param fields The field names of the resulting [[Table]].
- * @tparam T The type of the [[DataStream]].
- * @return The converted [[Table]].
- */
- def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
-
- val name = createUniqueTableName()
- registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
- ingest(name)
- }
-
- /**
- * Registers the given [[DataStream]] as table in the
- * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * The field names of the [[Table]] are automatically derived
- * from the type of the [[DataStream]].
- *
- * @param name The name under which the [[DataStream]] is registered in the catalog.
- * @param dataStream The [[DataStream]] to register.
- * @tparam T The type of the [[DataStream]] to register.
- */
- def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
-
- checkValidTableName(name)
- registerDataStreamInternal(name, dataStream.javaStream)
- }
-
- /**
- * Registers the given [[DataStream]] as table with specified field names in the
- * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * Example:
- *
- * {{{
- * val set: DataStream[(String, Long)] = ...
- * tableEnv.registerDataStream("myTable", set, 'a, 'b)
- * }}}
- *
- * @param name The name under which the [[DataStream]] is registered in the catalog.
- * @param dataStream The [[DataStream]] to register.
- * @param fields The field names of the registered table.
- * @tparam T The type of the [[DataStream]] to register.
- */
- def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit = {
-
- checkValidTableName(name)
- registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
- }
-
- /**
- * Converts the given [[Table]] into a [[DataStream]] of a specified type.
- *
- * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
- * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
- * types: Fields are mapped by position, field types must match.
- * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
- *
- * @param table The [[Table]] to convert.
- * @tparam T The type of the resulting [[DataStream]].
- * @return The converted [[DataStream]].
- */
- def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
- asScalaStream(translate(table))
- }
-
- /**
- * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
- * Registered functions can be referenced in SQL queries.
- *
- * @param name The name under which the function is registered.
- * @param tf The TableFunction to register
- */
- def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
- registerTableFunctionInternal(name, tf)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
deleted file mode 100644
index 720dac0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.DataStream
-
-import org.apache.flink.api.table.{Table, TableException}
-import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv}
-import org.apache.flink.api.scala.table.{StreamTableEnvironment => ScalaStreamTableEnv}
-
-/**
- * Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]].
- *
- * @param table The table to convert.
- */
-class TableConversions(table: Table) {
-
- /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
- def toDataSet[T: TypeInformation]: DataSet[T] = {
-
- table.tableEnv match {
- case tEnv: ScalaBatchTableEnv =>
- tEnv.toDataSet(table)
- case _ =>
- throw new TableException(
- "Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
- }
- }
-
- /** Converts the [[Table]] to a [[DataStream]] of the specified type. */
- def toDataStream[T: TypeInformation]: DataStream[T] = {
-
- table.tableEnv match {
- case tEnv: ScalaStreamTableEnv =>
- tEnv.toDataStream(table)
- case _ =>
- throw new TableException(
- "Only tables that originate from Scala DataStreams " +
- "can be converted to Scala DataStreams.")
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
deleted file mode 100644
index 823458a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ /dev/null
@@ -1,691 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.calcite.avatica.util.DateTimeUtils._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
-import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
-import org.apache.flink.api.table.expressions._
-import java.math.{BigDecimal => JBigDecimal}
-
-import scala.language.implicitConversions
-
-/**
- * These are all the operations that can be used to construct an [[Expression]] AST for expression
- * operations.
- *
- * These operations must be kept in sync with the parser in
- * [[org.apache.flink.api.table.expressions.ExpressionParser]].
- */
-trait ImplicitExpressionOperations {
- private[flink] def expr: Expression
-
- /**
- * Enables literals on left side of binary expressions.
- *
- * e.g. 12.toExpr % 'a
- *
- * @return expression
- */
- def toExpr: Expression = expr
-
- def && (other: Expression) = And(expr, other)
- def || (other: Expression) = Or(expr, other)
-
- def > (other: Expression) = GreaterThan(expr, other)
- def >= (other: Expression) = GreaterThanOrEqual(expr, other)
- def < (other: Expression) = LessThan(expr, other)
- def <= (other: Expression) = LessThanOrEqual(expr, other)
-
- def === (other: Expression) = EqualTo(expr, other)
- def !== (other: Expression) = NotEqualTo(expr, other)
-
- def unary_! = Not(expr)
- def unary_- = UnaryMinus(expr)
-
- def isNull = IsNull(expr)
- def isNotNull = IsNotNull(expr)
-
- /**
- * Returns true if given boolean expression is true. False otherwise (for null and false).
- */
- def isTrue = IsTrue(expr)
-
- /**
- * Returns true if given boolean expression is false. False otherwise (for null and true).
- */
- def isFalse = IsFalse(expr)
-
- /**
- * Returns true if given boolean expression is not true (for null and false). False otherwise.
- */
- def isNotTrue = IsNotTrue(expr)
-
- /**
- * Returns true if given boolean expression is not false (for null and true). False otherwise.
- */
- def isNotFalse = IsNotFalse(expr)
-
- def + (other: Expression) = Plus(expr, other)
- def - (other: Expression) = Minus(expr, other)
- def / (other: Expression) = Div(expr, other)
- def * (other: Expression) = Mul(expr, other)
- def % (other: Expression) = mod(other)
-
- def sum = Sum(expr)
- def min = Min(expr)
- def max = Max(expr)
- def count = Count(expr)
- def avg = Avg(expr)
-
- def cast(toType: TypeInformation[_]) = Cast(expr, toType)
-
- /**
- * Specifies a name for an expression i.e. a field.
- *
- * @param name name for one field
- * @param extraNames additional names if the expression expands to multiple fields
- * @return field with an alias
- */
- def as(name: Symbol, extraNames: Symbol*) = Alias(expr, name.name, extraNames.map(_.name))
-
- def asc = Asc(expr)
- def desc = Desc(expr)
-
- /**
- * Returns the start time of a window when applied on a window reference.
- */
- def start = WindowStart(expr)
-
- /**
- * Returns the end time of a window when applied on a window reference.
- */
- def end = WindowEnd(expr)
-
- /**
- * Ternary conditional operator that decides which of two other expressions should be evaluated
- * based on a evaluated boolean condition.
- *
- * e.g. (42 > 5).?("A", "B") leads to "A"
- *
- * @param ifTrue expression to be evaluated if condition holds
- * @param ifFalse expression to be evaluated if condition does not hold
- */
- def ?(ifTrue: Expression, ifFalse: Expression) = {
- If(expr, ifTrue, ifFalse)
- }
-
- // scalar functions
-
- /**
- * Calculates the remainder of division the given number by another one.
- */
- def mod(other: Expression) = Mod(expr, other)
-
- /**
- * Calculates the Euler's number raised to the given power.
- */
- def exp() = Exp(expr)
-
- /**
- * Calculates the base 10 logarithm of given value.
- */
- def log10() = Log10(expr)
-
- /**
- * Calculates the natural logarithm of given value.
- */
- def ln() = Ln(expr)
-
- /**
- * Calculates the given number raised to the power of the other value.
- */
- def power(other: Expression) = Power(expr, other)
-
- /**
- * Calculates the square root of a given value.
- */
- def sqrt() = Sqrt(expr)
-
- /**
- * Calculates the absolute value of given value.
- */
- def abs() = Abs(expr)
-
- /**
- * Calculates the largest integer less than or equal to a given number.
- */
- def floor() = Floor(expr)
-
- /**
- * Calculates the smallest integer greater than or equal to a given number.
- */
- def ceil() = Ceil(expr)
-
- // String operations
-
- /**
- * Creates a substring of the given string at given index for a given length.
- *
- * @param beginIndex first character of the substring (starting at 1, inclusive)
- * @param length number of characters of the substring
- * @return substring
- */
- def substring(beginIndex: Expression, length: Expression) =
- Substring(expr, beginIndex, length)
-
- /**
- * Creates a substring of the given string beginning at the given index to the end.
- *
- * @param beginIndex first character of the substring (starting at 1, inclusive)
- * @return substring
- */
- def substring(beginIndex: Expression) =
- new Substring(expr, beginIndex)
-
- /**
- * Removes leading and/or trailing characters from the given string.
- *
- * @param removeLeading if true, remove leading characters (default: true)
- * @param removeTrailing if true, remove trailing characters (default: true)
- * @param character string containing the character (default: " ")
- * @return trimmed string
- */
- def trim(
- removeLeading: Boolean = true,
- removeTrailing: Boolean = true,
- character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = {
- if (removeLeading && removeTrailing) {
- Trim(TrimMode.BOTH, character, expr)
- } else if (removeLeading) {
- Trim(TrimMode.LEADING, character, expr)
- } else if (removeTrailing) {
- Trim(TrimMode.TRAILING, character, expr)
- } else {
- expr
- }
- }
-
- /**
- * Returns the length of a string.
- */
- def charLength() = CharLength(expr)
-
- /**
- * Returns all of the characters in a string in upper case using the rules of
- * the default locale.
- */
- def upperCase() = Upper(expr)
-
- /**
- * Returns all of the characters in a string in lower case using the rules of
- * the default locale.
- */
- def lowerCase() = Lower(expr)
-
- /**
- * Converts the initial letter of each word in a string to uppercase.
- * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
- */
- def initCap() = InitCap(expr)
-
- /**
- * Returns true, if a string matches the specified LIKE pattern.
- *
- * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
- */
- def like(pattern: Expression) = Like(expr, pattern)
-
- /**
- * Returns true, if a string matches the specified SQL regex pattern.
- *
- * e.g. "A+" matches all strings that consist of at least one A
- */
- def similar(pattern: Expression) = Similar(expr, pattern)
-
- /**
- * Returns the position of string in an other string starting at 1.
- * Returns 0 if string could not be found.
- *
- * e.g. "a".position("bbbbba") leads to 6
- */
- def position(haystack: Expression) = Position(expr, haystack)
-
- /**
- * Replaces a substring of string with a string starting at a position (starting at 1).
- *
- * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
- */
- def overlay(newString: Expression, starting: Expression) = new Overlay(expr, newString, starting)
-
- /**
- * Replaces a substring of string with a string starting at a position (starting at 1).
- * The length specifies how many characters should be removed.
- *
- * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
- */
- def overlay(newString: Expression, starting: Expression, length: Expression) =
- Overlay(expr, newString, starting, length)
-
- // Temporal operations
-
- /**
- * Parses a date string in the form "yy-mm-dd" to a SQL Date.
- */
- def toDate = Cast(expr, SqlTimeTypeInfo.DATE)
-
- /**
- * Parses a time string in the form "hh:mm:ss" to a SQL Time.
- */
- def toTime = Cast(expr, SqlTimeTypeInfo.TIME)
-
- /**
- * Parses a timestamp string in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp.
- */
- def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
-
- /**
- * Extracts parts of a time point or time interval. Returns the part as a long value.
- *
- * e.g. "2006-06-05".toDate.extract(DAY) leads to 5
- */
- def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
-
- /**
- * Returns the quarter of a year from a SQL date.
- *
- * e.g. "1994-09-27".toDate.quarter() leads to 3
- */
- def quarter() = Quarter(expr)
-
- /**
- * Rounds down a time point to the given unit.
- *
- * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
- */
- def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr)
-
- /**
- * Rounds up a time point to the given unit.
- *
- * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
- */
- def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr)
-
- // Interval types
-
- /**
- * Creates an interval of the given number of years.
- *
- * @return interval of months
- */
- def year = toMonthInterval(expr, 12)
-
- /**
- * Creates an interval of the given number of years.
- *
- * @return interval of months
- */
- def years = year
-
- /**
- * Creates an interval of the given number of months.
- *
- * @return interval of months
- */
- def month = toMonthInterval(expr, 1)
-
- /**
- * Creates an interval of the given number of months.
- *
- * @return interval of months
- */
- def months = month
-
- /**
- * Creates an interval of the given number of days.
- *
- * @return interval of milliseconds
- */
- def day = toMilliInterval(expr, MILLIS_PER_DAY)
-
- /**
- * Creates an interval of the given number of days.
- *
- * @return interval of milliseconds
- */
- def days = day
-
- /**
- * Creates an interval of the given number of hours.
- *
- * @return interval of milliseconds
- */
- def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
-
- /**
- * Creates an interval of the given number of hours.
- *
- * @return interval of milliseconds
- */
- def hours = hour
-
- /**
- * Creates an interval of the given number of minutes.
- *
- * @return interval of milliseconds
- */
- def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
-
- /**
- * Creates an interval of the given number of minutes.
- *
- * @return interval of milliseconds
- */
- def minutes = minute
-
- /**
- * Creates an interval of the given number of seconds.
- *
- * @return interval of milliseconds
- */
- def second = toMilliInterval(expr, MILLIS_PER_SECOND)
-
- /**
- * Creates an interval of the given number of seconds.
- *
- * @return interval of milliseconds
- */
- def seconds = second
-
- /**
- * Creates an interval of the given number of milliseconds.
- *
- * @return interval of milliseconds
- */
- def milli = toMilliInterval(expr, 1)
-
- /**
- * Creates an interval of the given number of milliseconds.
- *
- * @return interval of milliseconds
- */
- def millis = milli
-
- // row interval type
-
- /**
- * Creates an interval of rows.
- *
- * @return interval of rows
- */
- def rows = toRowInterval(expr)
-
- /**
- * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
- * returns it's value.
- *
- * @param name name of the field (similar to Flink's field expressions)
- * @return value of the field
- */
- def get(name: String) = GetCompositeField(expr, name)
-
- /**
- * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
- * returns it's value.
- *
- * @param index position of the field
- * @return value of the field
- */
- def get(index: Int) = GetCompositeField(expr, index)
-
- /**
- * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
- * into a flat representation where every subtype is a separate field.
- */
- def flatten() = Flattening(expr)
-
- /**
- * Accesses the element of an array based on an index (starting at 1).
- *
- * @param index position of the element (starting at 1)
- * @return value of the element
- */
- def at(index: Expression) = ArrayElementAt(expr, index)
-
- /**
- * Returns the number of elements of an array.
- *
- * @return number of elements
- */
- def cardinality() = ArrayCardinality(expr)
-
- /**
- * Returns the sole element of an array with a single element. Returns null if the array is
- * empty. Throws an exception if the array has more than one element.
- *
- * @return the first and only element of an array with a single element
- */
- def element() = ArrayElement(expr)
-}
-
-/**
- * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]]
- * to [[ImplicitExpressionOperations]].
- */
-trait ImplicitExpressionConversions {
- implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
- def expr = e
- }
-
- implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations {
- def expr = UnresolvedFieldReference(s.name)
- }
-
- implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations {
- def expr = Literal(l)
- }
-
- implicit class LiteralByteExpression(b: Byte) extends ImplicitExpressionOperations {
- def expr = Literal(b)
- }
-
- implicit class LiteralShortExpression(s: Short) extends ImplicitExpressionOperations {
- def expr = Literal(s)
- }
-
- implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations {
- def expr = Literal(i)
- }
-
- implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations {
- def expr = Literal(f)
- }
-
- implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations {
- def expr = Literal(d)
- }
-
- implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations {
- def expr = Literal(str)
- }
-
- implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations {
- def expr = Literal(bool)
- }
-
- implicit class LiteralJavaDecimalExpression(javaDecimal: java.math.BigDecimal)
- extends ImplicitExpressionOperations {
- def expr = Literal(javaDecimal)
- }
-
- implicit class LiteralScalaDecimalExpression(scalaDecimal: scala.math.BigDecimal)
- extends ImplicitExpressionOperations {
- def expr = Literal(scalaDecimal.bigDecimal)
- }
-
- implicit class LiteralSqlDateExpression(sqlDate: Date) extends ImplicitExpressionOperations {
- def expr = Literal(sqlDate)
- }
-
- implicit class LiteralSqlTimeExpression(sqlTime: Time) extends ImplicitExpressionOperations {
- def expr = Literal(sqlTime)
- }
-
- implicit class LiteralSqlTimestampExpression(sqlTimestamp: Timestamp)
- extends ImplicitExpressionOperations {
- def expr = Literal(sqlTimestamp)
- }
-
- implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name)
- implicit def byte2Literal(b: Byte): Expression = Literal(b)
- implicit def short2Literal(s: Short): Expression = Literal(s)
- implicit def int2Literal(i: Int): Expression = Literal(i)
- implicit def long2Literal(l: Long): Expression = Literal(l)
- implicit def double2Literal(d: Double): Expression = Literal(d)
- implicit def float2Literal(d: Float): Expression = Literal(d)
- implicit def string2Literal(str: String): Expression = Literal(str)
- implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
- implicit def javaDec2Literal(javaDec: JBigDecimal): Expression = Literal(javaDec)
- implicit def scalaDec2Literal(scalaDec: BigDecimal): Expression =
- Literal(scalaDec.bigDecimal)
- implicit def sqlDate2Literal(sqlDate: Date): Expression = Literal(sqlDate)
- implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
- implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression =
- Literal(sqlTimestamp)
- implicit def array2ArrayConstructor(array: Array[_]): Expression = convertArray(array)
-}
-
-// ------------------------------------------------------------------------------------------------
-// Expressions with no parameters
-// ------------------------------------------------------------------------------------------------
-
-// we disable the object checker here as it checks for capital letters of objects
-// but we want that objects look like functions in certain cases e.g. array(1, 2, 3)
-// scalastyle:off object.name
-
-/**
- * Returns the current SQL date in UTC time zone.
- */
-object currentDate {
-
- /**
- * Returns the current SQL date in UTC time zone.
- */
- def apply(): Expression = {
- CurrentDate()
- }
-}
-
-/**
- * Returns the current SQL time in UTC time zone.
- */
-object currentTime {
-
- /**
- * Returns the current SQL time in UTC time zone.
- */
- def apply(): Expression = {
- CurrentTime()
- }
-}
-
-/**
- * Returns the current SQL timestamp in UTC time zone.
- */
-object currentTimestamp {
-
- /**
- * Returns the current SQL timestamp in UTC time zone.
- */
- def apply(): Expression = {
- CurrentTimestamp()
- }
-}
-
-/**
- * Returns the current SQL time in local time zone.
- */
-object localTime {
-
- /**
- * Returns the current SQL time in local time zone.
- */
- def apply(): Expression = {
- LocalTime()
- }
-}
-
-/**
- * Returns the current SQL timestamp in local time zone.
- */
-object localTimestamp {
-
- /**
- * Returns the current SQL timestamp in local time zone.
- */
- def apply(): Expression = {
- LocalTimestamp()
- }
-}
-
-/**
- * Determines whether two anchored time intervals overlap. Time point and temporal are
- * transformed into a range defined by two time points (start, end). The function
- * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
- *
- * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
- *
- * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
- */
-object temporalOverlaps {
-
- /**
- * Determines whether two anchored time intervals overlap. Time point and temporal are
- * transformed into a range defined by two time points (start, end).
- *
- * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
- *
- * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
- */
- def apply(
- leftTimePoint: Expression,
- leftTemporal: Expression,
- rightTimePoint: Expression,
- rightTemporal: Expression): Expression = {
- TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
- }
-}
-
-/**
- * Creates an array of literals. The array will be an array of objects (not primitives).
- */
-object array {
-
- /**
- * Creates an array of literals. The array will be an array of objects (not primitives).
- */
- def apply(head: Expression, tail: Expression*): Expression = {
- ArrayConstructor(head +: tail.toSeq)
- }
-}
-
-// scalastyle:on object.name
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
deleted file mode 100644
index 16fda5b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/groupWindows.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow}
-
-/**
- * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
- * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
- * elements in 5 minutes intervals.
- */
-object Tumble {
-
- /**
- * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
- * windows. For example, a tumbling window of 5 minutes size groups
- * elements in 5 minutes intervals.
- *
- * @param size the size of the window as time or row-count interval.
- * @return a tumbling window
- */
- def over(size: Expression): TumblingWindow = new TumblingWindow(size)
-}
-
-/**
- * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by
- * a specified slide interval. If the slide interval is smaller than the window size, sliding
- * windows are overlapping. Thus, an element can be assigned to multiple windows.
- *
- * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
- * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
- * window evaluations.
- */
-object Slide {
-
- /**
- * Creates a sliding window. Sliding windows have a fixed size and slide by
- * a specified slide interval. If the slide interval is smaller than the window size, sliding
- * windows are overlapping. Thus, an element can be assigned to multiple windows.
- *
- * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
- * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
- * consecutive
- *
- * @param size the size of the window as time or row-count interval
- * @return a partially specified sliding window
- */
- def over(size: Expression): SlideWithSize = new SlideWithSize(size)
-}
-
-/**
- * Helper object for creating a session window. The boundary of session windows are defined by
- * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
- * gap period.
- */
-object Session {
-
- /**
- * Creates a session window. The boundary of session windows are defined by
- * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
- * gap period.
- *
- * @param gap specifies how long (as interval of milliseconds) to wait for new data before
- * closing the session window.
- * @return a session window
- */
- def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
deleted file mode 100644
index 1e8bf39..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.Table
-import org.apache.flink.types.Row
-import scala.language.implicitConversions
-import org.apache.flink.streaming.api.scala._
-
-import scala.reflect.ClassTag
-
-/**
- * == Table API (Scala) ==
- *
- * Importing this package with:
- *
- * {{{
- * import org.apache.flink.api.scala.table._
- * }}}
- *
- * imports implicit conversions for converting a [[DataSet]] and a [[DataStream]] to a
- * [[Table]]. This can be used to perform SQL-like queries on data. Please have
- * a look at [[Table]] to see which operations are supported and
- * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see how an
- * expression can be specified.
- *
- * When writing a query you can use Scala Symbols to refer to field names. One would
- * refer to field `a` by writing `'a`. Sometimes it is necessary to manually convert a
- * Scala literal to an Expression literal, in those cases use `Literal`, as in `Literal(3)`.
- *
- * Example:
- *
- * {{{
- * import org.apache.flink.api.scala._
- * import org.apache.flink.api.scala.table._
- *
- * val env = ExecutionEnvironment.getExecutionEnvironment
- * val tEnv = TableEnvironment.getTableEnvironment(env)
- *
- * val input: DataSet[(String, Int)] = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
- * val result = input
- * .toTable(tEnv, 'word, 'count)
- * .groupBy('word)
- * .select('word, 'count.avg)
- *
- * result.print()
- * }}}
- *
- */
-package object table extends ImplicitExpressionConversions {
-
- implicit def table2TableConversions(table: Table): TableConversions = {
- new TableConversions(table)
- }
-
- implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = {
- new DataSetConversions[T](set, set.getType())
- }
-
- implicit def table2RowDataSet(table: Table): DataSet[Row] = {
- val tableEnv = table.tableEnv.asInstanceOf[BatchTableEnvironment]
- tableEnv.toDataSet[Row](table)
- }
-
- implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
- new DataStreamConversions[T](set, set.dataType.asInstanceOf[CompositeType[T]])
- }
-
- implicit def table2RowDataStream(table: Table): DataStream[Row] = {
- val tableEnv = table.tableEnv.asInstanceOf[StreamTableEnvironment]
- tableEnv.toDataStream[Row](table)
- }
-
-}