You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/15 15:51:05 UTC
flink git commit: [FLINK-4599] [table] Add 'explain()' also to
StreamTableEnvironment
Repository: flink
Updated Branches:
refs/heads/master 5fdc72069 -> 545b72bee
[FLINK-4599] [table] Add 'explain()' also to StreamTableEnvironment
This closes #2485.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/545b72be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/545b72be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/545b72be
Branch: refs/heads/master
Commit: 545b72bee9b2297c9d1d2f5d59d6d839378fde92
Parents: 5fdc720
Author: chobeat <si...@gmail.com>
Authored: Fri Sep 9 11:27:41 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Thu Sep 15 17:45:30 2016 +0200
----------------------------------------------------------------------
docs/dev/table_api.md | 53 ++++++++++++++++
.../api/table/StreamTableEnvironment.scala | 16 +++++
.../api/scala/stream/ExplainStreamTest.scala | 64 ++++++++++++++++++++
.../test/scala/resources/testFilterStream0.out | 3 +
.../test/scala/resources/testUnionStream0.out | 4 ++
5 files changed, 140 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index fe3ddc3..b88a7da 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -2501,3 +2501,56 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r
By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`.
{% top %}
+
+Explaining a Table
+----
+The Table API provides a mechanism to describe the graph of operations that leads to the resulting output. This is done through the `TableEnvironment#explain(table)` method. It returns a string describing two graphs: the Abstract Syntax Tree of the relational algebra query and Flink's Execution Plan of the Job.
+
+Table `explain` is supported for both `BatchTableEnvironment` and `StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support the explanation of the Execution Plan.
+
+The following code shows an example and the corresponding output:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
+DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
+
+Table table1 = tEnv.fromDataStream(stream1, "count, word");
+Table table2 = tEnv.fromDataStream(stream2, "count, word");
+Table table = table1.unionAll(table2);
+
+String explanation = tEnv.explain(table);
+System.out.println(explanation);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+val table = table1.unionAll(table2)
+
+val explanation: String = tEnv.explain(table)
+println(explanation)
+{% endhighlight %}
+</div>
+</div>
+
+{% highlight text %}
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+ LogicalTableScan(table=[[_DataStreamTable_0]])
+ LogicalTableScan(table=[[_DataStreamTable_1]])
+{% endhighlight %}
+
+{% top %}
+
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 4f57ae9..f73cd3f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -272,4 +272,20 @@ abstract class StreamTableEnvironment(
}
+ /**
+ * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+ * the result of the given [[Table]].
+ *
+ * @param table The table for which the AST and execution plan will be returned.
+ */
+ def explain(table: Table): String = {
+
+ val ast = RelOptUtil.toString(table.getRelNode)
+
+ s"== Abstract Syntax Tree ==" +
+ System.lineSeparator +
+ ast
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
new file mode 100644
index 0000000..71500f1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainStreamTest
+ extends StreamingMultipleProgramsTestBase {
+
+ val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile
+
+ @Test
+ def testFilter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env.fromElements((1, "hello"))
+ .toTable(tEnv, 'a, 'b)
+ .filter("a % 2 = 0")
+
+ val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n")
+ assertEquals(result, source)
+ }
+
+ @Test
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table = table1.unionAll(table2)
+
+ val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n")
+ assertEquals(result, source)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
new file mode 100644
index 0000000..3fda6de
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
@@ -0,0 +1,3 @@
+== Abstract Syntax Tree ==
+LogicalFilter(condition=[=(MOD($0, 2), 0)])
+ LogicalTableScan(table=[[_DataStreamTable_0]])
http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
new file mode 100644
index 0000000..b2e3000
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -0,0 +1,4 @@
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+ LogicalTableScan(table=[[_DataStreamTable_0]])
+ LogicalTableScan(table=[[_DataStreamTable_1]])