You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gallenvara <gi...@git.apache.org> on 2015/12/25 09:31:28 UTC

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

GitHub user gallenvara opened a pull request:

    https://github.com/apache/flink/pull/1477

    [FLINK-3192] Add explain support to print ast and sql physical execution.

    Table API doesn't support sql-explanation now. Add the explain support to print ast (abstract syntax tree) and the physical execution of sql.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gallenvara/flink SqlExplanation

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1477.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1477
    
----
commit 3bb186378ccf4add375de6e5f88a96caf1104788
Author: gallenvara <ga...@126.com>
Date:   2015-12-11T05:57:51Z

    Add explain support to print the sql-execution plan.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r48752078
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -828,6 +828,18 @@ public JobExecutionResult execute() throws Exception {
     	 *                   be contacted to retrieve information relevant to the execution planning.
     	 */
     	public abstract String getExecutionPlan() throws Exception;
    +
    +	/**
    +	 * Creates the AST and plan with which the system will execute the program, and return them as
    +	 * sql-explanation format of HIVE.
    +	 * Note that this needs to be called,  before the plan is executed.
    +	 * 
    +	 * @param extended The sql explanation mode.
    +	 * @return The execution plan and AST of the program.
    +	 * @throws Exception Thrown, if the compiler could not be instantiated, or the master could not
    +	 *                   be contacted to retrieve information relevant to the execution planning.
    +	 */
    +	public abstract String getSqlExecutionPlan(boolean extended) throws Exception;
    --- End diff --
    
    I don't think we should add this method to all `ExecutionEnvironments`.
    The JSON string returned by `getExecutionPlan()` should contain all relevant information. How about, we query the JSON document and parse the String in the Table API to reformat the information according to HIVE's explain visualization?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49096670
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/Nodes.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.parser;
    --- End diff --
    
    Please create a new package for the the explain classes called `explain`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r48751597
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala ---
    @@ -1,271 +1,291 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements.  See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership.  The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License.  You may obtain a copy of the License at
    - *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -package org.apache.flink.api.table
    -
    -import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer}
    -import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference}
    -import org.apache.flink.api.table.parser.ExpressionParser
    -import org.apache.flink.api.table.plan._
    -
    -/**
    - * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs
    - * have [[org.apache.flink.api.scala.DataSet]] and
    - * [[org.apache.flink.streaming.api.scala.DataStream]].
    - *
    - * Use the methods of [[Table]] to transform data. Use
    - * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] back to a DataSet
    - * or DataStream.
    - *
    - * When using Scala a [[Table]] can also be converted using implicit conversions.
    - *
    - * Example:
    - *
    - * {{{
    - *   val table = set.toTable('a, 'b)
    - *   ...
    - *   val table2 = ...
    - *   val set = table2.toDataSet[MyType]
    - * }}}
    - *
    - * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments
    - * in a Scala DSL or as an expression String. Please refer to the documentation for the expression
    - * syntax.
    - */
    -case class Table(private[flink] val operation: PlanNode) {
    -
    -  /**
    -   * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
    -   * can contain complex expressions and aggregations.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10))
    -   * }}}
    -   */
    -  def select(fields: Expression*): Table = {
    -    val analyzer = new SelectionAnalyzer(operation.outputFields)
    -    val analyzedFields = fields.map(analyzer.analyze)
    -    val fieldNames = analyzedFields map(_.name)
    -    if (fieldNames.toSet.size != fieldNames.size) {
    -      throw new ExpressionException(s"Resulting fields names are not unique in expression" +
    -        s""" "${fields.mkString(", ")}".""")
    -    }
    -    this.copy(operation = Select(operation, analyzedFields))
    -  }
    -
    -  /**
    -   * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
    -   * can contain complex expressions and aggregations.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.select("key, value.avg + " The average" as average, other.substring(0, 10)")
    -   * }}}
    -   */
    -  def select(fields: String): Table = {
    -    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    -    select(fieldExprs: _*)
    -  }
    -
    -  /**
    -   * Renames the fields of the expression result. Use this to disambiguate fields before
    -   * joining to operations.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.as('a, 'b)
    -   * }}}
    -   */
    -  def as(fields: Expression*): Table = {
    -    fields forall {
    -      f => f.isInstanceOf[UnresolvedFieldReference]
    -    } match {
    -      case true =>
    -      case false => throw new ExpressionException("Only field expression allowed in as().")
    -    }
    -    this.copy(operation = As(operation, fields.toArray map { _.name }))
    -  }
    -
    -  /**
    -   * Renames the fields of the expression result. Use this to disambiguate fields before
    -   * joining to operations.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.as("a, b")
    -   * }}}
    -   */
    -  def as(fields: String): Table = {
    -    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    -    as(fieldExprs: _*)
    -  }
    -
    -  /**
    -   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
    -   * clause.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.filter('name === "Fred")
    -   * }}}
    -   */
    -  def filter(predicate: Expression): Table = {
    -    val analyzer = new PredicateAnalyzer(operation.outputFields)
    -    val analyzedPredicate = analyzer.analyze(predicate)
    -    this.copy(operation = Filter(operation, analyzedPredicate))
    -  }
    -
    -  /**
    -   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
    -   * clause.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.filter("name = 'Fred'")
    -   * }}}
    -   */
    -  def filter(predicate: String): Table = {
    -    val predicateExpr = ExpressionParser.parseExpression(predicate)
    -    filter(predicateExpr)
    -  }
    -
    -  /**
    -   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
    -   * clause.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.where('name === "Fred")
    -   * }}}
    -   */
    -  def where(predicate: Expression): Table = {
    -    filter(predicate)
    -  }
    -
    -  /**
    -   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
    -   * clause.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.where("name = 'Fred'")
    -   * }}}
    -   */
    -  def where(predicate: String): Table = {
    -    filter(predicate)
    -  }
    -
    -  /**
    -   * Groups the elements on some grouping keys. Use this before a selection with aggregations
    -   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.groupBy('key).select('key, 'value.avg)
    -   * }}}
    -   */
    -  def groupBy(fields: Expression*): Table = {
    -    val analyzer = new GroupByAnalyzer(operation.outputFields)
    -    val analyzedFields = fields.map(analyzer.analyze)
    -
    -    val illegalKeys = analyzedFields filter {
    -      case fe: ResolvedFieldReference => false // OK
    -      case e => true
    -    }
    -
    -    if (illegalKeys.nonEmpty) {
    -      throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", "))
    -    }
    -
    -    this.copy(operation = GroupBy(operation, analyzedFields))
    -  }
    -
    -  /**
    -   * Groups the elements on some grouping keys. Use this before a selection with aggregations
    -   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   in.groupBy("key").select("key, value.avg")
    -   * }}}
    -   */
    -  def groupBy(fields: String): Table = {
    -    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
    -    groupBy(fieldsExpr: _*)
    -  }
    -
    -  /**
    -   * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    -   * operations must not overlap, use [[as]] to rename fields if necessary. You can use
    -   * where and select clauses after a join to further specify the behaviour of the join.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
    -   * }}}
    -   */
    -  def join(right: Table): Table = {
    -    val leftInputNames = operation.outputFields.map(_._1).toSet
    -    val rightInputNames = right.operation.outputFields.map(_._1).toSet
    -    if (leftInputNames.intersect(rightInputNames).nonEmpty) {
    -      throw new ExpressionException(
    -        "Overlapping fields names on join input, result would be ambiguous: " +
    -          operation.outputFields.mkString(", ") +
    -          " and " +
    -          right.operation.outputFields.mkString(", ") )
    -    }
    -    this.copy(operation = Join(operation, right.operation))
    -  }
    -
    -  /**
    -   * Union two[[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
    -   * must fully overlap.
    -   *
    -   * Example:
    -   *
    -   * {{{
    -   *   left.unionAll(right)
    -   * }}}
    -   */
    -  def unionAll(right: Table): Table = {
    -    val leftInputFields = operation.outputFields
    -    val rightInputFields = right.operation.outputFields
    -    if (!leftInputFields.equals(rightInputFields)) {
    -      throw new ExpressionException(
    -        "The fields names of join inputs should be fully overlapped, left inputs fields:" +
    -          operation.outputFields.mkString(", ") +
    -          " and right inputs fields" +
    -          right.operation.outputFields.mkString(", ")
    -      )
    -    }
    -    this.copy(operation = UnionAll(operation, right.operation))
    -  }
    -
    -  override def toString: String = s"Expression($operation)"
    -}
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table
    +
    +import org.apache.flink.api.java.io.DiscardingOutputFormat
    +import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer}
    +import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference}
    +import org.apache.flink.api.table.parser.ExpressionParser
    +import org.apache.flink.api.table.plan._
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +
    +/**
    + * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs
    + * have [[org.apache.flink.api.scala.DataSet]] and
    + * [[org.apache.flink.streaming.api.scala.DataStream]].
    + *
    + * Use the methods of [[Table]] to transform data. Use
    + * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] back to a DataSet
    + * or DataStream.
    + *
    + * When using Scala a [[Table]] can also be converted using implicit conversions.
    + *
    + * Example:
    + *
    + * {{{
    + *   val table = set.toTable('a, 'b)
    + *   ...
    + *   val table2 = ...
    + *   val set = table2.toDataSet[MyType]
    + * }}}
    + *
    + * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments
    + * in a Scala DSL or as an expression String. Please refer to the documentation for the expression
    + * syntax.
    + */
    +case class Table(private[flink] val operation: PlanNode) {
    +
    +  /**
    +   * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
    +   * can contain complex expressions and aggregations.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10))
    +   * }}}
    +   */
    +  def select(fields: Expression*): Table = {
    +    val analyzer = new SelectionAnalyzer(operation.outputFields)
    +    val analyzedFields = fields.map(analyzer.analyze)
    +    val fieldNames = analyzedFields map(_.name)
    +    if (fieldNames.toSet.size != fieldNames.size) {
    +      throw new ExpressionException(s"Resulting fields names are not unique in expression" +
    +        s""" "${fields.mkString(", ")}".""")
    +    }
    +    this.copy(operation = Select(operation, analyzedFields))
    +  }
    +
    +  /**
    +   * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
    +   * can contain complex expressions and aggregations.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.select("key, value.avg + " The average" as average, other.substring(0, 10)")
    +   * }}}
    +   */
    +  def select(fields: String): Table = {
    +    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    +    select(fieldExprs: _*)
    +  }
    +
    +  /**
    +   * Renames the fields of the expression result. Use this to disambiguate fields before
    +   * joining to operations.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.as('a, 'b)
    +   * }}}
    +   */
    +  def as(fields: Expression*): Table = {
    +    fields forall {
    +      f => f.isInstanceOf[UnresolvedFieldReference]
    +    } match {
    +      case true =>
    +      case false => throw new ExpressionException("Only field expression allowed in as().")
    +    }
    +    this.copy(operation = As(operation, fields.toArray map { _.name }))
    +  }
    +
    +  /**
    +   * Renames the fields of the expression result. Use this to disambiguate fields before
    +   * joining to operations.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.as("a, b")
    +   * }}}
    +   */
    +  def as(fields: String): Table = {
    +    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    +    as(fieldExprs: _*)
    +  }
    +
    +  /**
    +   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
    +   * clause.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.filter('name === "Fred")
    +   * }}}
    +   */
    +  def filter(predicate: Expression): Table = {
    +    val analyzer = new PredicateAnalyzer(operation.outputFields)
    +    val analyzedPredicate = analyzer.analyze(predicate)
    +    this.copy(operation = Filter(operation, analyzedPredicate))
    +  }
    +
    +  /**
    +   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
    +   * clause.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.filter("name = 'Fred'")
    +   * }}}
    +   */
    +  def filter(predicate: String): Table = {
    +    val predicateExpr = ExpressionParser.parseExpression(predicate)
    +    filter(predicateExpr)
    +  }
    +
    +  /**
    +   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
    +   * clause.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.where('name === "Fred")
    +   * }}}
    +   */
    +  def where(predicate: Expression): Table = {
    +    filter(predicate)
    +  }
    +
    +  /**
    +   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
    +   * clause.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.where("name = 'Fred'")
    +   * }}}
    +   */
    +  def where(predicate: String): Table = {
    +    filter(predicate)
    +  }
    +
    +  /**
    +   * Groups the elements on some grouping keys. Use this before a selection with aggregations
    +   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.groupBy('key).select('key, 'value.avg)
    +   * }}}
    +   */
    +  def groupBy(fields: Expression*): Table = {
    +    val analyzer = new GroupByAnalyzer(operation.outputFields)
    +    val analyzedFields = fields.map(analyzer.analyze)
    +
    +    val illegalKeys = analyzedFields filter {
    +      case fe: ResolvedFieldReference => false // OK
    +      case e => true
    +    }
    +
    +    if (illegalKeys.nonEmpty) {
    +      throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", "))
    +    }
    +
    +    this.copy(operation = GroupBy(operation, analyzedFields))
    +  }
    +
    +  /**
    +   * Groups the elements on some grouping keys. Use this before a selection with aggregations
    +   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   in.groupBy("key").select("key, value.avg")
    +   * }}}
    +   */
    +  def groupBy(fields: String): Table = {
    +    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
    +    groupBy(fieldsExpr: _*)
    +  }
    +
    +  /**
    +   * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    +   * operations must not overlap, use [[as]] to rename fields if necessary. You can use
    +   * where and select clauses after a join to further specify the behaviour of the join.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
    +   * }}}
    +   */
    +  def join(right: Table): Table = {
    +    val leftInputNames = operation.outputFields.map(_._1).toSet
    +    val rightInputNames = right.operation.outputFields.map(_._1).toSet
    +    if (leftInputNames.intersect(rightInputNames).nonEmpty) {
    +      throw new ExpressionException(
    +        "Overlapping fields names on join input, result would be ambiguous: " +
    +          operation.outputFields.mkString(", ") +
    +          " and " +
    +          right.operation.outputFields.mkString(", ") )
    +    }
    +    this.copy(operation = Join(operation, right.operation))
    +  }
    +
    +  /**
    +   * Union two[[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
    +   * must fully overlap.
    +   *
    +   * Example:
    +   *
    +   * {{{
    +   *   left.unionAll(right)
    +   * }}}
    +   */
    +  def unionAll(right: Table): Table = {
    +    val leftInputFields = operation.outputFields
    +    val rightInputFields = right.operation.outputFields
    +    if (!leftInputFields.equals(rightInputFields)) {
    +      throw new ExpressionException(
    +        "The fields names of join inputs should be fully overlapped, left inputs fields:" +
    +          operation.outputFields.mkString(", ") +
    +          " and right inputs fields" +
    +          right.operation.outputFields.mkString(", ")
    +      )
    +    }
    +    this.copy(operation = UnionAll(operation, right.operation))
    +  }
    +
    +  /**
    +   * Get the process of the sql parsing, print AST and physical execution plan.The AST
    +   * show the structure of the supplied statement. The execution plan shows how the table 
    +   * referenced by the statement will be scanned.
    +   */
    +  def explain(extended: Boolean): String = {
    --- End diff --
    
    It looks like you add only this method.
    Can you check why the whole file is marked as changed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49097137
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/PlanJsonParser.java ---
    @@ -0,0 +1,128 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.parser;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.DeserializationFeature;
    +
    +import java.io.PrintWriter;
    +import java.io.StringWriter;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +
    +public class PlanJsonParser {
    +
    +	public String getSqlExecutionPlan(String t, boolean extended) throws Exception{
    +		StringWriter sw = new StringWriter();
    +		PrintWriter pw = new PrintWriter(sw);
    +		json2Map(t, extended, pw);
    +		pw.close();
    +		return sw.toString();
    +	}
    +
    +	public void printTab(int tabCount, PrintWriter pw) {
    +		for (int i = 0; i < tabCount; i++)
    +			pw.print("\t");
    +	}
    +
    +	public PlanTrees json2Map(String t, Boolean extended, PrintWriter pw) throws Exception {
    --- End diff --
    
    Please rename this class to something like `buildSqlPlanString`, make it `private` and change the return type to `void`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-169270202
  
    Thanks @gallenvara. 
    Flink uses Jackson to handle JSON data. You can use it to parse the JSON String.
    
    Looking forward to your update :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49183864
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.explain;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.DeserializationFeature;
    +
    +import java.io.PrintWriter;
    +import java.io.StringWriter;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +
    +public class PlanJsonParser {
    +	
    +	public static String getSqlExecutionPlan(String t, boolean extended) throws Exception{
    +		StringWriter sw = new StringWriter();
    +		PrintWriter pw = new PrintWriter(sw);
    +		buildSqlExecutionPlan(t, extended, pw);
    +		pw.close();
    +		return sw.toString();
    +	}
    +
    +	private static void printTab(int tabCount, PrintWriter pw) {
    +		for (int i = 0; i < tabCount; i++)
    +			pw.print("\t");
    +	}
    +
    +	private static void buildSqlExecutionPlan(String t, Boolean extended, PrintWriter pw) throws Exception {
    --- End diff --
    
    I think we can merge `buildSqlExecutionPlan` and `getSqlExecutionPlan` into a single method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-169261591
  
    @fhueske ,thanks a lot for the review work! I'll modify the code and update the PR according to your advice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1477


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49096753
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/PlanTrees.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.parser;
    +
    +import java.util.List;
    +
    +public class PlanTrees {
    --- End diff --
    
    Make this an internal class of `PlanJsonParser`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-170712317
  
    @gallenvara, Then let's keep it like this. Thanks for the update. PR is good to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-169882956
  
    @fhueske , @rmetzger , thanks for review work. I have modified related code as you adviced and submitted a new commit. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49161249
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala ---
    @@ -267,5 +271,24 @@ case class Table(private[flink] val operation: PlanNode) {
         this.copy(operation = UnionAll(operation, right.operation))
       }
     
    +  /**
    +   * Get the process of the sql parsing, print AST and physical execution plan.The AST
    +   * show the structure of the supplied statement. The execution plan shows how the table 
    +   * referenced by the statement will be scanned.
    +   */
    +  def explain(extended: Boolean): String = {
    +    val ast = operation
    +    val dataSet = this.toDataSet[Row]
    +    val env = dataSet.getExecutionEnvironment
    +    dataSet.output(new DiscardingOutputFormat[Row])
    +    val string = env.getExecutionPlan()
    --- End diff --
    
    It's better to use some meaningful variable name here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49072116
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -94,6 +94,12 @@ under the License.
     			<scope>test</scope>
     		</dependency>
     
    +        <dependency> 
    +            <groupId>com.fasterxml.jackson.core</groupId>
    +            <artifactId>jackson-databind</artifactId>
    +            <version>2.4.4</version>
    --- End diff --
    
    can you use the `${jackson.version}` variable here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-170467184
  
    @fhueske , i"m favor of showing the whole plan. First, `Table` API converts `DataSet` to a `Table` and provides several operations on the `DataSource`. It's a data-processing level and part of the DataSet. Showing the whole plan will help user know better with the DataSet program completely. Second, `getExecutionPlan` returns the plan from initial DataSource to final `DataSink`. According to my limited comprehension, it's not easy to figure the query operations out only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49161453
  
    --- Diff: pom.xml ---
    @@ -796,6 +796,7 @@ under the License.
     						<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
     						<exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude>
     						<exclude>out/test/flink-avro/avro/user.avsc</exclude>
    +                        <exclude>flink-staging/flink-table/src/test/scala/resources/*.out</exclude>
    --- End diff --
    
    indention issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-169733257
  
    Hi @gallenvara, the PR looks much better. I added a few comment inline.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-168736055
  
    Hi @gallenvara, thanks for this PR!
    Explain is a very nice feature for the Table API (and later on for the SQL interface).
    
    IMO, it would be better to reuse the existing JSON plan and generate the HIVE visualization locally inside of the Table API. That would avoid to replicate quite a bit of functionality just for the sake of a different visualization. Would that work as well?
    
    In general, it is a good idea to discuss changes that modify user-facing APIs and touch several components/modules before starting to code as pointed out in our [contribution guidelines](http://flink.apache.org/contribute-code.html).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r48751756
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
    @@ -1,271 +1,287 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements.  See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership.  The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License.  You may obtain a copy of the License at
    - *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.flink.api.java;
    -
    -import org.apache.flink.api.common.InvalidProgramException;
    -import org.apache.flink.api.common.JobExecutionResult;
    -import org.apache.flink.api.common.JobID;
    -import org.apache.flink.api.common.Plan;
    -import org.apache.flink.api.common.PlanExecutor;
    -import org.apache.flink.configuration.Configuration;
    -
    -import java.io.File;
    -import java.net.MalformedURLException;
    -import java.net.URL;
    -
    -/**
    - * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
    - * needs to be created with the address and port of the JobManager of the Flink cluster that
    - * should execute the programs.
    - * 
    - * <p>Many programs executed via the remote environment depend on additional classes. Such classes
    - * may be the classes of functions (transformation, aggregation, ...) or libraries. Those classes
    - * must be attached to the remote environment as JAR files, to allow the environment to ship the
    - * classes into the cluster for the distributed execution.
    - */
    -public class RemoteEnvironment extends ExecutionEnvironment {
    -	
    -	/** The hostname of the JobManager */
    -	protected final String host;
    -
    -	/** The port of the JobManager main actor system */
    -	protected final int port;
    -
    -	/** The jar files that need to be attached to each job */
    -	private final URL[] jarFiles;
    -
    -	/** The configuration used by the client that connects to the cluster */
    -	private Configuration clientConfiguration;
    -	
    -	/** The remote executor lazily created upon first use */
    -	private PlanExecutor executor;
    -	
    -	/** Optional shutdown hook, used in session mode to eagerly terminate the last session */
    -	private Thread shutdownHook;
    -
    -	/** The classpaths that need to be attached to each job */
    -	private final URL[] globalClasspaths;
    -
    -	/**
    -	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
    -	 * given host name and port.
    -	 *
    -	 * <p>Each program execution will have all the given JAR files in its classpath.
    -	 *
    -	 * @param host The host name or address of the master (JobManager), where the program should be executed.
    -	 * @param port The port of the master (JobManager), where the program should be executed.
    -	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
    -	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
    -	 *                 provided in the JAR files.
    -	 */
    -	public RemoteEnvironment(String host, int port, String... jarFiles) {
    -		this(host, port, null, jarFiles, null);
    -	}
    -
    -	/**
    -	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
    -	 * given host name and port.
    -	 *
    -	 * <p>Each program execution will have all the given JAR files in its classpath.
    -	 *
    -	 * @param host The host name or address of the master (JobManager), where the program should be executed.
    -	 * @param port The port of the master (JobManager), where the program should be executed.
    -	 * @param clientConfig The configuration used by the client that connects to the cluster.
    -	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
    -	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
    -	 *                 provided in the JAR files.
    -	 */
    -	public RemoteEnvironment(String host, int port, Configuration clientConfig, String[] jarFiles) {
    -		this(host, port, clientConfig, jarFiles, null);
    -	}
    -
    -	/**
    -	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
    -	 * given host name and port.
    -	 * 
    -	 * <p>Each program execution will have all the given JAR files in its classpath.
    -	 *
    -	 * @param host The host name or address of the master (JobManager), where the program should be executed.
    -	 * @param port The port of the master (JobManager), where the program should be executed.
    -	 * @param clientConfig The configuration used by the client that connects to the cluster.
    -	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
    -	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
    -	 *                 provided in the JAR files.
    -	 * @param globalClasspaths The paths of directories and JAR files that are added to each user code 
    -	 *                 classloader on all nodes in the cluster. Note that the paths must specify a 
    -	 *                 protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
    -	 *                 The protocol must be supported by the {@link java.net.URLClassLoader}.
    -	 */
    -	public RemoteEnvironment(String host, int port, Configuration clientConfig,
    -			String[] jarFiles, URL[] globalClasspaths) {
    -		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
    -			throw new InvalidProgramException(
    -					"The RemoteEnvironment cannot be instantiated when running in a pre-defined context " +
    -							"(such as Command Line Client, Scala Shell, or TestEnvironment)");
    -		}
    -		if (host == null) {
    -			throw new NullPointerException("Host must not be null.");
    -		}
    -		if (port < 1 || port >= 0xffff) {
    -			throw new IllegalArgumentException("Port out of range");
    -		}
    -
    -		this.host = host;
    -		this.port = port;
    -		this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
    -		if (jarFiles != null) {
    -			this.jarFiles = new URL[jarFiles.length];
    -			for (int i = 0; i < jarFiles.length; i++) {
    -				try {
    -					this.jarFiles[i] = new File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
    -				} catch (MalformedURLException e) {
    -					throw new IllegalArgumentException("JAR file path invalid", e);
    -				}
    -			}
    -		}
    -		else {
    -			this.jarFiles = null;
    -		}
    -		this.globalClasspaths = globalClasspaths;
    -	}
    -
    -	// ------------------------------------------------------------------------
    -
    -	@Override
    -	public JobExecutionResult execute(String jobName) throws Exception {
    -		ensureExecutorCreated();
    -
    -		Plan p = createProgramPlan(jobName);
    -
    -		// Session management is disabled, revert this commit to enable
    -		//p.setJobId(jobID);
    -		//p.setSessionTimeout(sessionTimeout);
    -
    -		JobExecutionResult result = executor.executePlan(p);
    -
    -		this.lastJobExecutionResult = result;
    -		return result;
    -	}
    -
    -	@Override
    -	public String getExecutionPlan() throws Exception {
    -		Plan p = createProgramPlan("plan", false);
    -
    -		// make sure that we do not start an new executor here
    -		// if one runs, fine, of not, we create a local executor (lightweight) and let it
    -		// generate the plan
    -		if (executor != null) {
    -			return executor.getOptimizerPlanAsJSON(p);
    -		}
    -		else {
    -			PlanExecutor le = PlanExecutor.createLocalExecutor(null);
    -			return le.getOptimizerPlanAsJSON(p);
    -		}
    -	}
    -
    -	@Override
    -	public void startNewSession() throws Exception {
    -		dispose();
    -		jobID = JobID.generate();
    -		installShutdownHook();
    -	}
    -	
    -	private void ensureExecutorCreated() throws Exception {
    -		if (executor == null) {
    -			executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration,
    -				jarFiles, globalClasspaths);
    -			executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
    -		}
    -		
    -		// if we are using sessions, we keep the executor running
    -		if (getSessionTimeout() > 0 && !executor.isRunning()) {
    -			executor.start();
    -			installShutdownHook();
    -		}
    -	}
    -
    -	// ------------------------------------------------------------------------
    -	//  Dispose
    -	// ------------------------------------------------------------------------
    -
    -	protected void dispose() {
    -		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
    -		// shutdown hook itself
    -		if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
    -			try {
    -				Runtime.getRuntime().removeShutdownHook(shutdownHook);
    -			}
    -			catch (IllegalStateException e) {
    -				// race, JVM is in shutdown already, we can safely ignore this
    -			}
    -			catch (Throwable t) {
    -				LOG.warn("Exception while unregistering the cleanup shutdown hook.");
    -			}
    -		}
    -		
    -		try {
    -			PlanExecutor executor = this.executor;
    -			if (executor != null) {
    -				executor.endSession(jobID);
    -				executor.stop();
    -			}
    -		}
    -		catch (Exception e) {
    -			throw new RuntimeException("Failed to dispose the session shutdown hook.");
    -		}
    -	}
    -	
    -	@Override
    -	public String toString() {
    -		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
    -				(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
    -	}
    -	
    -	// ------------------------------------------------------------------------
    -	//  Shutdown hooks and reapers
    -	// ------------------------------------------------------------------------
    -
    -	private void installShutdownHook() {
    -		if (shutdownHook == null) {
    -			Thread shutdownHook = new Thread(new Runnable() {
    -				@Override
    -				public void run() {
    -					try {
    -						dispose();
    -					}
    -					catch (Throwable t) {
    -						LOG.error("Error in cleanup of RemoteEnvironment during JVM shutdown: " + t.getMessage(), t);
    -					}
    -				}
    -			});
    -	
    -			try {
    -				// Add JVM shutdown hook to call shutdown of service
    -				Runtime.getRuntime().addShutdownHook(shutdownHook);
    -				this.shutdownHook = shutdownHook;
    -			}
    -			catch (IllegalStateException e) {
    -				// JVM is already shutting down. no need or a shutdown hook
    -			}
    -			catch (Throwable t) {
    -				LOG.error("Cannot register shutdown hook that cleanly terminates the BLOB service.");
    -			}
    -		}
    -	}
    -}
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java;
    +
    +import org.apache.flink.api.common.InvalidProgramException;
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.Plan;
    +import org.apache.flink.api.common.PlanExecutor;
    +import org.apache.flink.configuration.Configuration;
    +
    +import java.io.File;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +
    +/**
    + * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
    + * needs to be created with the address and port of the JobManager of the Flink cluster that
    + * should execute the programs.
    + * 
    + * <p>Many programs executed via the remote environment depend on additional classes. Such classes
    + * may be the classes of functions (transformation, aggregation, ...) or libraries. Those classes
    + * must be attached to the remote environment as JAR files, to allow the environment to ship the
    + * classes into the cluster for the distributed execution.
    + */
    +public class RemoteEnvironment extends ExecutionEnvironment {
    +	
    +	/** The hostname of the JobManager */
    +	protected final String host;
    +
    +	/** The port of the JobManager main actor system */
    +	protected final int port;
    +
    +	/** The jar files that need to be attached to each job */
    +	private final URL[] jarFiles;
    +
    +	/** The configuration used by the client that connects to the cluster */
    +	private Configuration clientConfiguration;
    +	
    +	/** The remote executor lazily created upon first use */
    +	private PlanExecutor executor;
    +	
    +	/** Optional shutdown hook, used in session mode to eagerly terminate the last session */
    +	private Thread shutdownHook;
    +
    +	/** The classpaths that need to be attached to each job */
    +	private final URL[] globalClasspaths;
    +
    +	/**
    +	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
    +	 * given host name and port.
    +	 *
    +	 * <p>Each program execution will have all the given JAR files in its classpath.
    +	 *
    +	 * @param host The host name or address of the master (JobManager), where the program should be executed.
    +	 * @param port The port of the master (JobManager), where the program should be executed.
    +	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
    +	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
    +	 *                 provided in the JAR files.
    +	 */
    +	public RemoteEnvironment(String host, int port, String... jarFiles) {
    +		this(host, port, null, jarFiles, null);
    +	}
    +
    +	/**
    +	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
    +	 * given host name and port.
    +	 *
    +	 * <p>Each program execution will have all the given JAR files in its classpath.
    +	 *
    +	 * @param host The host name or address of the master (JobManager), where the program should be executed.
    +	 * @param port The port of the master (JobManager), where the program should be executed.
    +	 * @param clientConfig The configuration used by the client that connects to the cluster.
    +	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
    +	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
    +	 *                 provided in the JAR files.
    +	 */
    +	public RemoteEnvironment(String host, int port, Configuration clientConfig, String[] jarFiles) {
    +		this(host, port, clientConfig, jarFiles, null);
    +	}
    +
    +	/**
    +	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
    +	 * given host name and port.
    +	 * 
    +	 * <p>Each program execution will have all the given JAR files in its classpath.
    +	 *
    +	 * @param host The host name or address of the master (JobManager), where the program should be executed.
    +	 * @param port The port of the master (JobManager), where the program should be executed.
    +	 * @param clientConfig The configuration used by the client that connects to the cluster.
    +	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
    +	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
    +	 *                 provided in the JAR files.
    +	 * @param globalClasspaths The paths of directories and JAR files that are added to each user code 
    +	 *                 classloader on all nodes in the cluster. Note that the paths must specify a 
    +	 *                 protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
    +	 *                 The protocol must be supported by the {@link java.net.URLClassLoader}.
    +	 */
    +	public RemoteEnvironment(String host, int port, Configuration clientConfig,
    +			String[] jarFiles, URL[] globalClasspaths) {
    +		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
    +			throw new InvalidProgramException(
    +					"The RemoteEnvironment cannot be instantiated when running in a pre-defined context " +
    +							"(such as Command Line Client, Scala Shell, or TestEnvironment)");
    +		}
    +		if (host == null) {
    +			throw new NullPointerException("Host must not be null.");
    +		}
    +		if (port < 1 || port >= 0xffff) {
    +			throw new IllegalArgumentException("Port out of range");
    +		}
    +
    +		this.host = host;
    +		this.port = port;
    +		this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
    +		if (jarFiles != null) {
    +			this.jarFiles = new URL[jarFiles.length];
    +			for (int i = 0; i < jarFiles.length; i++) {
    +				try {
    +					this.jarFiles[i] = new File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
    +				} catch (MalformedURLException e) {
    +					throw new IllegalArgumentException("JAR file path invalid", e);
    +				}
    +			}
    +		}
    +		else {
    +			this.jarFiles = null;
    +		}
    +		this.globalClasspaths = globalClasspaths;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public JobExecutionResult execute(String jobName) throws Exception {
    +		ensureExecutorCreated();
    +
    +		Plan p = createProgramPlan(jobName);
    +
    +		// Session management is disabled, revert this commit to enable
    +		//p.setJobId(jobID);
    +		//p.setSessionTimeout(sessionTimeout);
    +
    +		JobExecutionResult result = executor.executePlan(p);
    +
    +		this.lastJobExecutionResult = result;
    +		return result;
    +	}
    +
    +	@Override
    +	public String getExecutionPlan() throws Exception {
    +		Plan p = createProgramPlan("plan", false);
    +
    +		// make sure that we do not start an new executor here
    +		// if one runs, fine, or not, we create a local executor (lightweight) and let it
    +		// generate the plan
    +		if (executor != null) {
    +			return executor.getOptimizerPlanAsJSON(p);
    +		}
    +		else {
    +			PlanExecutor le = PlanExecutor.createLocalExecutor(null);
    +			return le.getOptimizerPlanAsJSON(p);
    +		}
    +	}
    +
    +	@Override
    +	public String getSqlExecutionPlan(boolean extended) throws Exception {
    --- End diff --
    
    I think you added only this method. 
    Can you check why the whole file is marked as changed and revert unrelated changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-169987537
  
    I added two small comments in-line. 
    But there is another thing that came to my mind. A `Table` can be constructed from any type of `DataSet` not just a `DataSource`. So, it is possible that the input of a query is a partial DataSet program. In that case, the Explain output would include the DataSet program as well as the Table query. I am not sure if that is actually desirable. Do you think it is possible to limit the Explain output to the current query only? Or would you be in favor of showing the whole plan?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r48752174
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java ---
    @@ -129,6 +129,8 @@ public boolean isPrintingStatusDuringExecution() {
     	 */
     	public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;
     
    +	public abstract String getOptimizerPlanContext(Plan plan, boolean extended) throws Exception;
    --- End diff --
    
    If we reuse the JSON string (as proposed in my comment at `ExecutionEnvironment`) we do not need this method either.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49096445
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/Nodes.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.parser;
    +
    +import java.util.List;
    +
    +public class Nodes {
    --- End diff --
    
    Name this class to `Node` instead of `Nodes`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-171054949
  
    Merging this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-169967055
  
     @ChengXiangLi @fhueske @rmetzger , thanks a lot for your suggestions. The codes have been modified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1477#issuecomment-169532718
  
    @fhueske , codes has been finished. I have drop previous method of plan-generator and rewrite a new parser named `PlanJsonParser` to parse the existing JSON plan. Could you help with review work? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49161542
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.explain;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.DeserializationFeature;
    +
    +import java.io.PrintWriter;
    +import java.io.StringWriter;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +
    +public class PlanJsonParser {
    +	
    +	public String getSqlExecutionPlan(String t, boolean extended) throws Exception{
    --- End diff --
    
    This method depends on none internal state, it should be better to make it a static method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r48751391
  
    --- Diff: flink-staging/flink-scala-shell/pom.xml ---
    @@ -76,6 +76,12 @@ under the License.
     			<version>${scala.version}</version>
     		</dependency>
     
    +        <dependency>
    --- End diff --
    
    Why do we need this dependency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49161417
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -94,6 +94,12 @@ under the License.
     			<scope>test</scope>
     		</dependency>
     
    +        <dependency> 
    --- End diff --
    
    the indention seems not same as previous dependency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3192] Add explain support to print ast ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1477#discussion_r49183832
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.explain;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.DeserializationFeature;
    +
    +import java.io.PrintWriter;
    +import java.io.StringWriter;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +
    +public class PlanJsonParser {
    +	
    +	public static String getSqlExecutionPlan(String t, boolean extended) throws Exception{
    --- End diff --
    
    Add space after `Exception`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---