You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/03 10:34:56 UTC

[2/4] flink git commit: [FLINK-1788] [table] Make logical plans transformable

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
index b0e2d05..ba376f5 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table.runtime
 
 import org.apache.flink.api.table.codegen.GenerateUnaryPredicate
-import org.apache.flink.api.table.tree.{NopExpression, Expression}
+import org.apache.flink.api.table.expressions.{NopExpression, Expression}
 import org.apache.flink.api.common.functions.RichFilterFunction
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.configuration.Configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
index f0f5636..f5616d3 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.table.runtime
 
-import org.apache.flink.api.table.tree.{NopExpression, Expression}
+import org.apache.flink.api.table.expressions.{NopExpression, Expression}
 import org.apache.flink.api.common.functions.RichFlatJoinFunction
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.table.codegen.{GenerateBinaryResultAssembler,

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
index 0a2830b..16e256a 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.table.runtime
 
-import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.table.codegen.GenerateUnaryResultAssembler

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
deleted file mode 100644
index 6302572..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
-
-import scala.language.postfixOps
-
-
-abstract class Expression extends Product {
-  def children: Seq[Expression]
-  def name: String = Expression.freshName("expression")
-  def typeInfo: TypeInformation[_]
-
-  /**
-   * Tests for equality by first testing for reference equality.
-   */
-  def fastEquals(other: Expression): Boolean = this.eq(other) || this == other
-
-  def transformPre(rule: PartialFunction[Expression, Expression]): Expression = {
-    val afterTransform = rule.applyOrElse(this, identity[Expression])
-
-    if (afterTransform fastEquals this) {
-      this.transformChildrenPre(rule)
-    } else {
-      afterTransform.transformChildrenPre(rule)
-    }
-  }
-
-  def transformChildrenPre(rule: PartialFunction[Expression, Expression]): Expression = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: Expression if children.contains(child) =>
-        val newChild = child.transformPre(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def transformPost(rule: PartialFunction[Expression, Expression]): Expression = {
-    val afterChildren = transformChildrenPost(rule)
-    if (afterChildren fastEquals this) {
-      rule.applyOrElse(this, identity[Expression])
-    } else {
-      rule.applyOrElse(afterChildren, identity[Expression])
-    }
-  }
-
-  def transformChildrenPost(rule: PartialFunction[Expression, Expression]): Expression = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: Expression if children.contains(child) =>
-        val newChild = child.transformPost(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-    // toArray forces evaluation, toSeq does not seem to work here
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def exists(predicate: Expression => Boolean): Boolean = {
-    var exists = false
-    this.transformPre {
-      case e: Expression => if (predicate(e)) {
-        exists = true
-      }
-        e
-    }
-    exists
-  }
-
-  /**
-   * Creates a new copy of this expression with new children. This is used during transformation
-   * if children change. This must be overridden by Expressions that don't have the Constructor
-   * arguments in the same order as the `children`.
-   */
-  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-    val defaultCtor =
-      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
-    try {
-      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
-    } catch {
-      case iae: IllegalArgumentException =>
-        println("IAE " + this)
-        throw new RuntimeException("Should never happen.")
-    }
-  }
-}
-
-abstract class BinaryExpression() extends Expression {
-  def left: Expression
-  def right: Expression
-  def children = Seq(left, right)
-}
-
-abstract class UnaryExpression() extends Expression {
-  def child: Expression
-  def children = Seq(child)
-}
-
-abstract class LeafExpression() extends Expression {
-  val children = Nil
-}
-
-case class NopExpression() extends LeafExpression {
-  val typeInfo = new NothingTypeInfo()
-  override val name = Expression.freshName("nop")
-
-}
-
-object Expression {
-  def freshName(prefix: String): String = {
-    s"$prefix-${freshNameCounter.getAndIncrement}"
-  }
-
-  val freshNameCounter = new AtomicInteger
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
deleted file mode 100644
index e5cdac5..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.java.aggregation.Aggregations
-
-
-abstract sealed class Aggregation extends UnaryExpression {
-  def typeInfo = {
-    child.typeInfo match {
-      case BasicTypeInfo.LONG_TYPE_INFO => // ok
-      case BasicTypeInfo.INT_TYPE_INFO =>
-      case BasicTypeInfo.DOUBLE_TYPE_INFO =>
-      case BasicTypeInfo.FLOAT_TYPE_INFO =>
-      case BasicTypeInfo.BYTE_TYPE_INFO =>
-      case BasicTypeInfo.SHORT_TYPE_INFO =>
-      case _ =>
-      throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " +
-        s"aggregation $this. Only numeric data types supported.")
-    }
-    child.typeInfo
-  }
-
-  override def toString = s"Aggregate($child)"
-
-  def getIntermediateFields: Seq[Expression]
-  def getFinalField(inputs: Seq[Expression]): Expression
-  def getAggregations: Seq[Aggregations]
-}
-
-case class Sum(child: Expression) extends Aggregation {
-  override def toString = s"($child).sum"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.SUM)
-}
-
-case class Min(child: Expression) extends Aggregation {
-  override def toString = s"($child).min"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.MIN)
-
-}
-
-case class Max(child: Expression) extends Aggregation {
-  override def toString = s"($child).max"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.MAX)
-}
-
-case class Count(child: Expression) extends Aggregation {
-  override def typeInfo = {
-    child.typeInfo match {
-      case _ => // we can count anything... :D
-    }
-    BasicTypeInfo.INT_TYPE_INFO
-  }
-
-  override def toString = s"($child).count"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1)))
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.SUM)
-
-}
-
-case class Avg(child: Expression) extends Aggregation {
-  override def toString = s"($child).avg"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
-  // This is just sweet. Use our own AST representation and let the code generator do
-  // our dirty work.
-  override def getFinalField(inputs: Seq[Expression]): Expression =
-    Div(inputs(0), inputs(1))
-  override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
deleted file mode 100644
index 84f9b18..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation}
-
-abstract class BinaryArithmetic extends BinaryExpression {
-  def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    left.typeInfo
-  }
-}
-
-case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
-      !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
-      throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
-      !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
-      throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    left.typeInfo
-  }
-
-  override def toString = s"($left + $right)"
-}
-
-case class UnaryMinus(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
-    }
-    child.typeInfo
-  }
-
-  override def toString = s"-($child)"
-}
-
-case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left - $right)"
-}
-
-case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left / $right)"
-}
-
-case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left * $right)"
-}
-
-case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left * $right)"
-}
-
-case class Abs(child: Expression) extends UnaryExpression {
-  def typeInfo = child.typeInfo
-
-  override def toString = s"abs($child)"
-}
-
-abstract class BitwiseBinaryArithmetic extends BinaryExpression {
-  def typeInfo: TypeInformation[_] = {
-    if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
-    }
-    if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
-      left.typeInfo
-    } else {
-      BasicTypeInfo.INT_TYPE_INFO
-    }
-  }
-}
-
-case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
-  override def toString = s"($left & $right)"
-}
-
-case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
-  override def toString = s"($left | $right)"
-}
-
-
-case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
-  override def toString = s"($left ^ $right)"
-}
-
-case class BitwiseNot(child: Expression) extends UnaryExpression {
-  def typeInfo: TypeInformation[_] = {
-    if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
-    }
-    if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
-      child.typeInfo
-    } else {
-      BasicTypeInfo.INT_TYPE_INFO
-    }
-  }
-
-  override def toString = s"~($child)"
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
deleted file mode 100644
index a3acc35..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
-  def typeInfo = tpe
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
deleted file mode 100644
index e0a34a9..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
-
-abstract class BinaryComparison extends BinaryExpression {
-  def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(s"Non-numeric operand ${left} in $this")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(s"Non-numeric operand ${right} in $this")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-}
-
-case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def typeInfo = {
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"$left === $right"
-}
-
-case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def typeInfo = {
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"$left !== $right"
-}
-
-case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left > $right"
-}
-
-case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left >= $right"
-}
-
-case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left < $right"
-}
-
-case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left <= $right"
-}
-
-case class IsNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"($child).isNull"
-}
-
-case class IsNotNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"($child).isNotNull"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
deleted file mode 100644
index cc42148..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class UnresolvedFieldReference(override val name: String) extends LeafExpression {
-  def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this")
-
-  override def toString = "\"" + name
-}
-
-case class ResolvedFieldReference(
-    override val name: String,
-    tpe: TypeInformation[_]) extends LeafExpression {
-  def typeInfo = tpe
-
-  override def toString = s"'$name"
-}
-
-case class Naming(child: Expression, override val name: String) extends UnaryExpression {
-  def typeInfo = child.typeInfo
-
-  override def toString = s"$child as '$name"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
deleted file mode 100644
index 852d5a1..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala.table.ImplicitExpressionOperations
-
-object Literal {
-  def apply(l: Any): Literal = l match {
-    case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
-    case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
-    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
-    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
-    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
-    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
-  }
-}
-
-case class Literal(value: Any, tpe: TypeInformation[_])
-  extends LeafExpression with ImplicitExpressionOperations {
-  def expr = this
-  def typeInfo = tpe
-
-  override def toString = s"$value"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
deleted file mode 100644
index 8ab838d..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-abstract class BinaryPredicate extends BinaryExpression {
-  def typeInfo = {
-    if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
-      right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-}
-
-case class Not(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override val name = Expression.freshName("not-" + child.name)
-
-  override def toString = s"!($child)"
-}
-
-case class And(left: Expression, right: Expression) extends BinaryPredicate {
-  override def toString = s"$left && $right"
-
-  override val name = Expression.freshName(left.name + "-and-" + right.name)
-}
-
-case class Or(left: Expression, right: Expression) extends BinaryPredicate {
-  override def toString = s"$left || $right"
-
-  override val name = Expression.freshName(left.name + "-or-" + right.name)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
deleted file mode 100644
index caac402..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * This package contains the base class of AST nodes and all the expression language AST classes.
- * Expression trees should not be manually constructed by users. They are implicitly constructed
- * from the implicit DSL conversions in
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API,
- * expression trees should be generated from a string parser that parses expressions and creates
- * AST nodes.
- */
-package object tree

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
deleted file mode 100644
index e14374f..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
-
-case class Substring(
-    str: Expression,
-    beginIndex: Expression,
-    endIndex: Expression) extends Expression {
-  def typeInfo = {
-    if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
-      throw new ExpressionException(
-        s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
-    }
-    if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""")
-    }
-    if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""")
-    }
-
-    BasicTypeInfo.STRING_TYPE_INFO
-  }
-
-  override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
-  override def toString = s"($str).substring($beginIndex, $endIndex)"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
new file mode 100644
index 0000000..87051cf
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Base class for tree analyzers/transformers. Analyzers must implement method `rules` to
+ * provide the chain of rules that are invoked one after another. The tree resulting
+ * from one rule is fed into the next rule and the final result is returned from method `analyze`.
+ */
+abstract class Analyzer[A <: TreeNode[A]] {
+
+  def rules: Seq[Rule[A]]
+
+  final def analyze(expr: A): A = {
+    var currentTree = expr
+    for (rule <- rules) {
+      var running = true
+      while (running) {
+        val newTree = rule(currentTree)
+        if (newTree fastEquals currentTree) {
+          running = false
+        }
+        currentTree = newTree
+      }
+    }
+    currentTree
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
new file mode 100644
index 0000000..b8a27cb
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets a tree
+ * and must return a tree. The returned tree can also be the input tree. In an [[Analyzer]]
+ * rule chain the result tree of one [[Rule]] is fed into the next [[Rule]] in the chain.
+ *
+ * A [[Rule]] is repeatedly applied to a tree until the tree does not change between
+ * rule applications.
+ */
+abstract class Rule[A <: TreeNode[A]] {
+  def apply(expr: A): A
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
new file mode 100644
index 0000000..84f1d7e
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Generic base class for trees that can be transformed and traversed.
+ */
+abstract class TreeNode[A <: TreeNode[A]] { self: A with Product =>
+
+  /**
+   * List of child nodes that should be considered when doing transformations. Other values
+   * in the Product will not be transformed, only handed through.
+   */
+  def children: Seq[A]
+
+  /**
+   * Tests for equality by first testing for reference equality.
+   */
+  def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
+
+  def transformPre(rule: PartialFunction[A, A]): A = {
+    val afterTransform = rule.applyOrElse(this, identity[A])
+
+    if (afterTransform fastEquals this) {
+      this.transformChildrenPre(rule)
+    } else {
+      afterTransform.transformChildrenPre(rule)
+    }
+  }
+
+  def transformChildrenPre(rule: PartialFunction[A, A]): A = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: A if children.contains(child) =>
+        val newChild = child.transformPre(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def transformPost(rule: PartialFunction[A, A]): A = {
+    val afterChildren = transformChildrenPost(rule)
+    if (afterChildren fastEquals this) {
+      rule.applyOrElse(this, identity[A])
+    } else {
+      rule.applyOrElse(afterChildren, identity[A])
+    }
+  }
+
+  def transformChildrenPost(rule: PartialFunction[A, A]): A = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: A if children.contains(child) =>
+        val newChild = child.transformPost(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+    // toArray forces evaluation, toSeq does not seem to work here
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def exists(predicate: A => Boolean): Boolean = {
+    var exists = false
+    this.transformPre {
+      case e: A => if (predicate(e)) {
+        exists = true
+      }
+        e
+    }
+    exists
+  }
+
+  /**
+   * Creates a new copy of this expression with new children. This is used during transformation
+   * if children change. This must be overridden by tree nodes that don't have the Constructor
+   * arguments in the same order as the `children`.
+   */
+  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
+    val defaultCtor =
+      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
+    try {
+      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
+    } catch {
+      case iae: IllegalArgumentException =>
+        println("IAE " + this)
+        throw new RuntimeException("Should never happen.")
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
index 7ffa91c..db3c881 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.table.Row
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
deleted file mode 100644
index 604bdcf..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.table.tree.Literal
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.examples.java.graph.util.PageRankData
-import org.apache.flink.util.Collector
-
-import _root_.scala.collection.JavaConverters._
-
-/**
-* A basic implementation of the Page Rank algorithm using a bulk iteration.
-*
-* This implementation requires a set of pages and a set of directed links as input and works as
-* follows.
-*
-* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each
-* page collects the partial ranks of all pages that point to it, sums them up, and applies a
-* dampening factor to the sum. The result is the new rank of the page. A new iteration is started
-* with the new ranks of all pages. This implementation terminates after a fixed number of
-* iterations. This is the Wikipedia entry for the
-* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
-*
-* Input files are plain text files and must be formatted as follows:
-*
-*  - Pages represented as an (long) ID separated by new-line characters.
-*    For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
-*  - Links are represented as pairs of page IDs which are separated by space  characters. Links
-*    are separated by new-line characters.
-*    For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12),
-*    (1)->(12), and (42)->(63). For this simple implementation it is required that each page has
-*    at least one incoming and one outgoing link (a page can point to itself).
-*
-* Usage:
-* {{{
-*   PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>
-* }}}
-*
-* If no parameters are provided, the program is run with default data from
-* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
-*
-* This example shows how to use:
-*
-*  - Bulk Iterations
-*  - Table API expressions
-*/
-object PageRankExpression {
-
-  private final val DAMPENING_FACTOR: Double = 0.85
-  private final val EPSILON: Double = 0.0001
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // read input data
-    val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
-      .as('pageId, 'rank)
-
-    val links = getLinksDataSet(env)
-
-    // build adjacency list from link input
-    val adjacencyLists = links
-      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
-
-        override def reduce(
-            values: _root_.java.lang.Iterable[Link],
-            out: Collector[AdjacencyList]): Unit = {
-          var outputId = -1L
-          val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
-          out.collect(new AdjacencyList(outputId, outputList.toArray))
-        }
-
-      }).as('sourceId, 'targetIds)
-
-    // start iteration
-    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
-      currentRanks =>
-        val newRanks = currentRanks.toTable
-          // distribute ranks to target pages
-          .join(adjacencyLists).where('pageId === 'sourceId)
-          .select('rank, 'targetIds).as[RankOutput]
-          .flatMap {
-            (in, out: Collector[(Long, Double)]) =>
-              val targets = in.targetIds
-              val len = targets.length
-              targets foreach { t => out.collect((t, in.rank / len )) }
-          }
-          .as('pageId, 'rank)
-          // collect ranks and sum them up
-          .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
-          // apply dampening factor
-          .select(
-            'pageId,
-            ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank)
-
-
-        val termination = currentRanks.toTable
-          .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
-          .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
-
-        (newRanks, termination)
-    }
-
-    val result = finalRanks
-
-    // emit result
-    if (fileOutput) {
-      result.writeAsCsv(outputPath, "\n", " ")
-    } else {
-      result.print()
-    }
-
-    // execute program
-    env.execute("Expression PageRank Example")
-  }
-
-  // *************************************************************************
-  //     USER TYPES
-  // *************************************************************************
-
-  case class Link(sourceId: Long, targetId: Long)
-
-  case class Page(pageId: Long, rank: Double)
-
-  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
-
-  case class RankOutput(rank: Double, targetIds: Array[Long])
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 5) {
-        pagesInputPath = args(0)
-        linksInputPath = args(1)
-        outputPath = args(2)
-        numPages = args(3).toLong
-        maxIterations = args(4).toInt
-      } else {
-        System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " +
-          "pages> <num iterations>")
-        false
-      }
-    } else {
-      System.out.println("Executing PageRank Basic example with default parameters and built-in " +
-        "default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num " +
-        "pages> <num iterations>")
-
-      numPages = PageRankData.getNumberOfPages
-    }
-    true
-  }
-
-  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
-    if (fileOutput) {
-      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n")
-        .map(x => x._1)
-    } else {
-      env.generateSequence(1, 15)
-    }
-  }
-
-  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
-    if (fileOutput) {
-      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
-        includedFields = Array(0, 1))
-    } else {
-      val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
-        v2.asInstanceOf[Long])}
-      env.fromCollection(edges)
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var pagesInputPath: String = null
-  private var linksInputPath: String = null
-  private var outputPath: String = null
-  private var numPages: Double = 0
-  private var maxIterations: Int = 10
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
new file mode 100644
index 0000000..c0e0f6c
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.examples.java.graph.util.PageRankData
+import org.apache.flink.util.Collector
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+* A basic implementation of the Page Rank algorithm using a bulk iteration.
+*
+* This implementation requires a set of pages and a set of directed links as input and works as
+* follows.
+*
+* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each
+* page collects the partial ranks of all pages that point to it, sums them up, and applies a
+* dampening factor to the sum. The result is the new rank of the page. A new iteration is started
+* with the new ranks of all pages. This implementation terminates after a fixed number of
+* iterations. This is the Wikipedia entry for the
+* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
+*
+* Input files are plain text files and must be formatted as follows:
+*
+*  - Pages represented as an (long) ID separated by new-line characters.
+*    For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
+*  - Links are represented as pairs of page IDs which are separated by space  characters. Links
+*    are separated by new-line characters.
+*    For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12),
+*    (1)->(12), and (42)->(63). For this simple implementation it is required that each page has
+*    at least one incoming and one outgoing link (a page can point to itself).
+*
+* Usage:
+* {{{
+*   PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>
+* }}}
+*
+* If no parameters are provided, the program is run with default data from
+* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
+*
+* This example shows how to use:
+*
+*  - Bulk Iterations
+*  - Table API expressions
+*/
+object PageRankTable {
+
+  private final val DAMPENING_FACTOR: Double = 0.85
+  private final val EPSILON: Double = 0.0001
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read input data
+    val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
+      .as('pageId, 'rank)
+
+    val links = getLinksDataSet(env)
+
+    // build adjacency list from link input
+    val adjacencyLists = links
+      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
+
+        override def reduce(
+            values: _root_.java.lang.Iterable[Link],
+            out: Collector[AdjacencyList]): Unit = {
+          var outputId = -1L
+          val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
+          out.collect(new AdjacencyList(outputId, outputList.toArray))
+        }
+
+      }).as('sourceId, 'targetIds)
+
+    // start iteration
+    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+      currentRanks =>
+        val newRanks = currentRanks.toTable
+          // distribute ranks to target pages
+          .join(adjacencyLists).where('pageId === 'sourceId)
+          .select('rank, 'targetIds).toSet[RankOutput]
+          .flatMap {
+            (in, out: Collector[(Long, Double)]) =>
+              val targets = in.targetIds
+              val len = targets.length
+              targets foreach { t => out.collect((t, in.rank / len )) }
+          }
+          .as('pageId, 'rank)
+          // collect ranks and sum them up
+          .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
+          // apply dampening factor
+          .select(
+            'pageId,
+            ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank)
+
+
+        val termination = currentRanks.toTable
+          .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
+          .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
+
+        (newRanks, termination)
+    }
+
+    val result = finalRanks
+
+    // emit result
+    if (fileOutput) {
+      result.writeAsCsv(outputPath, "\n", " ")
+    } else {
+      result.print()
+    }
+
+    // execute program
+    env.execute("Expression PageRank Example")
+  }
+
+  // *************************************************************************
+  //     USER TYPES
+  // *************************************************************************
+
+  case class Link(sourceId: Long, targetId: Long)
+
+  case class Page(pageId: Long, rank: Double)
+
+  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
+  case class RankOutput(rank: Double, targetIds: Array[Long])
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 5) {
+        pagesInputPath = args(0)
+        linksInputPath = args(1)
+        outputPath = args(2)
+        numPages = args(3).toLong
+        maxIterations = args(4).toInt
+      } else {
+        System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " +
+          "pages> <num iterations>")
+        false
+      }
+    } else {
+      System.out.println("Executing PageRank Basic example with default parameters and built-in " +
+        "default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num " +
+        "pages> <num iterations>")
+
+      numPages = PageRankData.getNumberOfPages
+    }
+    true
+  }
+
+  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+    if (fileOutput) {
+      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n")
+        .map(x => x._1)
+    } else {
+      env.generateSequence(1, 15)
+    }
+  }
+
+  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
+    if (fileOutput) {
+      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+    } else {
+      val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
+        v2.asInstanceOf[Long])}
+      env.fromCollection(edges)
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var pagesInputPath: String = null
+  private var linksInputPath: String = null
+  private var outputPath: String = null
+  private var numPages: Double = 0
+  private var maxIterations: Int = 10
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
deleted file mode 100644
index 0ff97bf..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala
-
-import org.apache.flink.streaming.api.scala._
-
-import org.apache.flink.api.scala.table._
-
-import scala.Stream._
-import scala.math._
-import scala.language.postfixOps
-import scala.util.Random
-
-/**
- * Simple example for demonstrating the use of the Table API with Flink Streaming.
- */
-object StreamingExpressionFilter {
-
-  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val cars = genCarStream().toTable
-      .filter('carId === 0)
-      .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
-      .as[CarEvent]
-
-    cars.print()
-
-    StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
-
-  }
-
-  def genCarStream(): DataStream[CarEvent] = {
-
-    def nextSpeed(carEvent : CarEvent) : CarEvent =
-    {
-      val next =
-        if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5)
-      CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
-    }
-    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
-    {
-      Thread.sleep(1000)
-      speeds.append(carStream(speeds.map(nextSpeed)))
-    }
-    carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
-  }
-
-  def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      if (args.length == 3) {
-        numOfCars = args(0).toInt
-        evictionSec = args(1).toInt
-        triggerMeters = args(2).toDouble
-        true
-      }
-      else {
-        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
-        false
-      }
-    }else{
-      true
-    }
-  }
-
-  var numOfCars = 2
-  var evictionSec = 10
-  var triggerMeters = 50d
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
new file mode 100644
index 0000000..4aa5653
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.streaming.api.scala._
+
+import org.apache.flink.api.scala.table._
+
+import scala.Stream._
+import scala.math._
+import scala.language.postfixOps
+import scala.util.Random
+
+/**
+ * Simple example for demonstrating the use of the Table API with Flink Streaming.
+ */
+object StreamingTableFilter {
+
+  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val cars = genCarStream().toTable
+      .filter('carId === 0)
+      .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
+      .toStream[CarEvent]
+
+    cars.print()
+
+    StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
+
+  }
+
+  def genCarStream(): DataStream[CarEvent] = {
+
+    def nextSpeed(carEvent : CarEvent) : CarEvent =
+    {
+      val next =
+        if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5)
+      CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
+    }
+    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
+    {
+      Thread.sleep(1000)
+      speeds.append(carStream(speeds.map(nextSpeed)))
+    }
+    carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
+  }
+
+  def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 3) {
+        numOfCars = args(0).toInt
+        evictionSec = args(1).toInt
+        triggerMeters = args(2).toDouble
+        true
+      }
+      else {
+        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
+        false
+      }
+    }else{
+      true
+    }
+  }
+
+  var numOfCars = 2
+  var evictionSec = 10
+  var triggerMeters = 50d
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
deleted file mode 100644
index 96ec4ba..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.table.tree.Literal
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-
-/**
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple class.
- * The original query can be found at
- * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
- * (page 29).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- * SELECT 
- *      l_orderkey, 
- *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
- *      o_orderdate, 
- *      o_shippriority 
- * FROM customer, 
- *      orders, 
- *      lineitem 
- * WHERE
- *      c_mktsegment = '[SEGMENT]' 
- *      AND c_custkey = o_custkey
- *      AND l_orderkey = o_orderkey
- *      AND o_orderdate < date '[DATE]'
- *      AND l_shipdate > date '[DATE]'
- * GROUP BY
- *      l_orderkey, 
- *      o_orderdate, 
- *      o_shippriority;
- * }}}
- *
- * Compared to the original TPC-H query this version does not sort the result by revenue
- * and orderdate.
- *
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
- * as generated by the TPC-H data generator which is available at 
- * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
- *
- * Usage: 
- * {{{
- * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
- * }}}
- *  
- * This example shows how to use:
- *  - Table API expressions
- *
- */
-object TPCHQuery3Expression {
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set filter date
-    val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
-    val date = dateFormat.parse("1995-03-12")
-    
-    // get execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val lineitems = getLineitemDataSet(env)
-      .filter( l => dateFormat.parse(l.shipDate).after(date) )
-      .as('id, 'extdPrice, 'discount, 'shipDate)
-
-    val customers = getCustomerDataSet(env)
-      .as('id, 'mktSegment)
-      .filter( 'mktSegment === "AUTOMOBILE" )
-
-    val orders = getOrdersDataSet(env)
-      .filter( o => dateFormat.parse(o.orderDate).before(date) )
-      .as('orderId, 'custId, 'orderDate, 'shipPrio)
-
-    val items =
-      orders.join(customers)
-        .where('custId === 'id)
-        .select('orderId, 'orderDate, 'shipPrio)
-      .join(lineitems)
-        .where('orderId === 'id)
-        .select(
-          'orderId,
-          'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
-          'orderDate,
-          'shipPrio)
-
-    val result = items
-      .groupBy('orderId, 'orderDate, 'shipPrio)
-      .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
-
-    // emit result
-    result.writeAsCsv(outputPath, "\n", "|")
-
-    // execute program
-    env.execute("Scala TPCH Query 3 (Expression) Example")
-  }
-  
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-  
-  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
-  case class Customer(id: Long, mktSegment: String)
-  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-  
-  private var lineitemPath: String = null
-  private var customerPath: String = null
-  private var ordersPath: String = null
-  private var outputPath: String = null
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 4) {
-      lineitemPath = args(0)
-      customerPath = args(1)
-      ordersPath = args(2)
-      outputPath = args(3)
-      true
-    } else {
-      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-          " Due to legal restrictions, we can not ship generated data.\n" +
-          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
-          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
-                             "<orders-csv path> <result path>");
-      false
-    }
-  }
-  
-  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
-    env.readCsvFile[Lineitem](
-        lineitemPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 5, 6, 10) )
-  }
-
-  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
-    env.readCsvFile[Customer](
-        customerPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 6) )
-  }
-  
-  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
-    env.readCsvFile[Order](
-        ordersPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1, 4, 7) )
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
new file mode 100644
index 0000000..f527a3c
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple class.
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT 
+ *      l_orderkey, 
+ *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ *      o_orderdate, 
+ *      o_shippriority 
+ * FROM customer, 
+ *      orders, 
+ *      lineitem 
+ * WHERE
+ *      c_mktsegment = '[SEGMENT]' 
+ *      AND c_custkey = o_custkey
+ *      AND l_orderkey = o_orderkey
+ *      AND o_orderdate < date '[DATE]'
+ *      AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ *      l_orderkey, 
+ *      o_orderdate, 
+ *      o_shippriority;
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not sort the result by revenue
+ * and orderdate.
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at 
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage: 
+ * {{{
+ * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * }}}
+ *  
+ * This example shows how to use:
+ *  - Table API expressions
+ *
+ */
+object TPCHQuery3Table {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set filter date
+    val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
+    val date = dateFormat.parse("1995-03-12")
+    
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val lineitems = getLineitemDataSet(env)
+      .filter( l => dateFormat.parse(l.shipDate).after(date) )
+      .as('id, 'extdPrice, 'discount, 'shipDate)
+
+    val customers = getCustomerDataSet(env)
+      .as('id, 'mktSegment)
+      .filter( 'mktSegment === "AUTOMOBILE" )
+
+    val orders = getOrdersDataSet(env)
+      .filter( o => dateFormat.parse(o.orderDate).before(date) )
+      .as('orderId, 'custId, 'orderDate, 'shipPrio)
+
+    val items =
+      orders.join(customers)
+        .where('custId === 'id)
+        .select('orderId, 'orderDate, 'shipPrio)
+      .join(lineitems)
+        .where('orderId === 'id)
+        .select(
+          'orderId,
+          'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
+          'orderDate,
+          'shipPrio)
+
+    val result = items
+      .groupBy('orderId, 'orderDate, 'shipPrio)
+      .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
+
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+
+    // execute program
+    env.execute("Scala TPCH Query 3 (Expression) Example")
+  }
+  
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+  
+  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
+  case class Customer(id: Long, mktSegment: String)
+  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var lineitemPath: String = null
+  private var customerPath: String = null
+  private var ordersPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 4) {
+      lineitemPath = args(0)
+      customerPath = args(1)
+      ordersPath = args(2)
+      outputPath = args(3)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+          " Due to legal restrictions, we can not ship generated data.\n" +
+          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
+                             "<orders-csv path> <result path>");
+      false
+    }
+  }
+  
+  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+    env.readCsvFile[Lineitem](
+        lineitemPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 5, 6, 10) )
+  }
+
+  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+    env.readCsvFile[Customer](
+        customerPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 6) )
+  }
+  
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+    env.readCsvFile[Order](
+        ordersPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1, 4, 7) )
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 0b2a5df..60fb984 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -85,10 +85,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		Table<JavaBatchTranslator> table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env));
+		Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env));
 
-		Table<JavaBatchTranslator> result =
-				table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
+		Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
 
 		DataSet<Row> ds = tableEnv.toSet(result, Row.class);
 		ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
@@ -103,10 +102,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env));
 
-		Table<JavaBatchTranslator> result =
+		Table result =
 				table.select("foo.avg");
 
 		DataSet<Row> ds = tableEnv.toSet(result, Row.class);
@@ -127,10 +126,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 						new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
 						new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(input);
 
-		Table<JavaBatchTranslator> result =
+		Table result =
 				table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
 
 		DataSet<Row> ds = tableEnv.toSet(result, Row.class);
@@ -151,10 +150,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 						new Tuple2<Float, String>(1f, "Hello"),
 						new Tuple2<Float, String>(2f, "Ciao"));
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(input);
 
-		Table<JavaBatchTranslator> result =
+		Table result =
 				table.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\"");
 
 
@@ -174,10 +173,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f,
 				"Hello"));
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(input);
 
-		Table<JavaBatchTranslator> result =
+		Table result =
 				table.select("f1.sum");
 
 
@@ -196,10 +195,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 
 		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, "Hello"));
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(input);
 
-		Table<JavaBatchTranslator> result =
+		Table result =
 				table.select("f0.sum.sum");
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index ee877e9..6ec3187 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -65,7 +65,7 @@ public class AsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
 
 		DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -86,7 +86,7 @@ public class AsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b");
 
 		DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -102,7 +102,7 @@ public class AsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
 
 		DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -118,7 +118,7 @@ public class AsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
 
 		DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -134,7 +134,7 @@ public class AsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
 
 		DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -150,7 +150,7 @@ public class AsITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		Table<JavaBatchTranslator> table =
+		Table table =
 				tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," +
 						" c");