You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/15 15:51:05 UTC

flink git commit: [FLINK-4599] [table] Add 'explain()' also to StreamTableEnvironment

Repository: flink
Updated Branches:
  refs/heads/master 5fdc72069 -> 545b72bee


[FLINK-4599] [table] Add 'explain()' also to StreamTableEnvironment

This closes #2485.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/545b72be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/545b72be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/545b72be

Branch: refs/heads/master
Commit: 545b72bee9b2297c9d1d2f5d59d6d839378fde92
Parents: 5fdc720
Author: chobeat <si...@gmail.com>
Authored: Fri Sep 9 11:27:41 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Thu Sep 15 17:45:30 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           | 53 ++++++++++++++++
 .../api/table/StreamTableEnvironment.scala      | 16 +++++
 .../api/scala/stream/ExplainStreamTest.scala    | 64 ++++++++++++++++++++
 .../test/scala/resources/testFilterStream0.out  |  3 +
 .../test/scala/resources/testUnionStream0.out   |  4 ++
 5 files changed, 140 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index fe3ddc3..b88a7da 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -2501,3 +2501,56 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r
 By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`.
 
 {% top %}
+
+Explaining a Table
+----
+The Table API provides a mechanism to describe the graph of operations that leads to the resulting output. This is done through the `TableEnvironment#explain(table)` method. It returns a string describing two graphs: the Abstract Syntax Tree of the relational algebra query and Flink's Execution Plan of the Job. 
+
+Table `explain` is supported for both `BatchTableEnvironment` and `StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support the explanation of the Execution Plan.
+
+The following code shows an example and the corresponding output:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
+DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
+
+Table table1 = tEnv.fromDataStream(stream1, "count, word");
+Table table2 = tEnv.fromDataStream(stream2, "count, word");
+Table table = table1.unionAll(table2);
+
+String explanation = tEnv.explain(table);
+System.out.println(explanation);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+val table = table1.unionAll(table2)
+
+val explanation: String = tEnv.explain(table)
+println(explanation)
+{% endhighlight %}
+</div>
+</div>
+
+{% highlight text %}
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+  LogicalTableScan(table=[[_DataStreamTable_0]])
+  LogicalTableScan(table=[[_DataStreamTable_1]])
+{% endhighlight %}
+
+{% top %}
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 4f57ae9..f73cd3f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -272,4 +272,20 @@ abstract class StreamTableEnvironment(
 
   }
 
+  /**
+    * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+    * the result of the given [[Table]].
+    *
+    * @param table The table for which the AST and execution plan will be returned.
+    */
+   def explain(table: Table): String = {
+
+    val ast = RelOptUtil.toString(table.getRelNode)
+
+    s"== Abstract Syntax Tree ==" +
+      System.lineSeparator +
+      ast
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
new file mode 100644
index 0000000..71500f1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainStreamTest
+  extends StreamingMultipleProgramsTestBase {
+
+  val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile
+
+  @Test
+  def testFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
+
+    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
+
+    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(result, source)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
new file mode 100644
index 0000000..3fda6de
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
@@ -0,0 +1,3 @@
+== Abstract Syntax Tree ==
+LogicalFilter(condition=[=(MOD($0, 2), 0)])
+  LogicalTableScan(table=[[_DataStreamTable_0]])

http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
new file mode 100644
index 0000000..b2e3000
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -0,0 +1,4 @@
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+  LogicalTableScan(table=[[_DataStreamTable_0]])
+  LogicalTableScan(table=[[_DataStreamTable_1]])