You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/29 13:50:36 UTC

[07/12] flink git commit: [FLINK-1623] Rename Expression API to Table API

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
deleted file mode 100644
index e3f62f8..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation}
-
-abstract class BinaryArithmetic extends BinaryExpression {
-  def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    left.typeInfo
-  }
-}
-
-case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
-      !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
-      throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
-      !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
-      throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    left.typeInfo
-  }
-
-  override def toString = s"($left + $right)"
-}
-
-case class UnaryMinus(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
-    }
-    child.typeInfo
-  }
-
-  override def toString = s"-($child)"
-}
-
-case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left - $right)"
-}
-
-case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left / $right)"
-}
-
-case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left * $right)"
-}
-
-case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left * $right)"
-}
-
-case class Abs(child: Expression) extends UnaryExpression {
-  def typeInfo = child.typeInfo
-
-  override def toString = s"abs($child)"
-}
-
-abstract class BitwiseBinaryArithmetic extends BinaryExpression {
-  def typeInfo: TypeInformation[_] = {
-    if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
-    }
-    if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
-      left.typeInfo
-    } else {
-      BasicTypeInfo.INT_TYPE_INFO
-    }
-  }
-}
-
-case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
-  override def toString = s"($left & $right)"
-}
-
-case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
-  override def toString = s"($left | $right)"
-}
-
-
-case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
-  override def toString = s"($left ^ $right)"
-}
-
-case class BitwiseNot(child: Expression) extends UnaryExpression {
-  def typeInfo: TypeInformation[_] = {
-    if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
-    }
-    if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
-      child.typeInfo
-    } else {
-      BasicTypeInfo.INT_TYPE_INFO
-    }
-  }
-
-  override def toString = s"~($child)"
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
deleted file mode 100644
index f83de5b..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
-  def typeInfo = tpe
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
deleted file mode 100644
index fdb5fd0..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
-
-abstract class BinaryComparison extends BinaryExpression {
-  def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(s"Non-numeric operand ${left} in $this")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(s"Non-numeric operand ${right} in $this")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-}
-
-case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def typeInfo = {
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"$left === $right"
-}
-
-case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def typeInfo = {
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"$left !== $right"
-}
-
-case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left > $right"
-}
-
-case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left >= $right"
-}
-
-case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left < $right"
-}
-
-case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left <= $right"
-}
-
-case class IsNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"($child).isNull"
-}
-
-case class IsNotNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"($child).isNotNull"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
deleted file mode 100644
index a1d8589..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class UnresolvedFieldReference(override val name: String) extends LeafExpression {
-  def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this")
-
-  override def toString = "\"" + name
-}
-
-case class ResolvedFieldReference(
-    override val name: String,
-    tpe: TypeInformation[_]) extends LeafExpression {
-  def typeInfo = tpe
-
-  override def toString = s"'$name"
-}
-
-case class Naming(child: Expression, override val name: String) extends UnaryExpression {
-  def typeInfo = child.typeInfo
-
-  override def toString = s"$child as '$name"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
deleted file mode 100644
index 03949ee..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala.expressions.ImplicitExpressionOperations
-
-object Literal {
-  def apply(l: Any): Literal = l match {
-    case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
-    case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
-    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
-    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
-    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
-    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
-  }
-}
-
-case class Literal(value: Any, tpe: TypeInformation[_])
-  extends LeafExpression with ImplicitExpressionOperations {
-  def expr = this
-  def typeInfo = tpe
-
-  override def toString = s"$value"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
deleted file mode 100644
index 8f0a068..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-abstract class BinaryPredicate extends BinaryExpression {
-  def typeInfo = {
-    if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
-      right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-}
-
-case class Not(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override val name = Expression.freshName("not-" + child.name)
-
-  override def toString = s"!($child)"
-}
-
-case class And(left: Expression, right: Expression) extends BinaryPredicate {
-  override def toString = s"$left && $right"
-
-  override val name = Expression.freshName(left.name + "-and-" + right.name)
-}
-
-case class Or(left: Expression, right: Expression) extends BinaryPredicate {
-  override def toString = s"$left || $right"
-
-  override val name = Expression.freshName(left.name + "-or-" + right.name)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
deleted file mode 100644
index 04c29f7..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-/**
- * This package contains the base class of AST nodes and all the expression language AST classes.
- * Expression trees should not be manually constructed by users. They are implicitly constructed
- * from the implicit DSL conversions in
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API,
- * expression trees should be generated from a string parser that parses expressions and creates
- * AST nodes.
- */
-package object tree

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
deleted file mode 100644
index 175d445..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
-
-case class Substring(
-    str: Expression,
-    beginIndex: Expression,
-    endIndex: Expression) extends Expression {
-  def typeInfo = {
-    if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
-      throw new ExpressionException(
-        s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
-    }
-    if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""")
-    }
-    if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""")
-    }
-
-    BasicTypeInfo.STRING_TYPE_INFO
-  }
-
-  override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
-  override def toString = s"($str).substring($beginIndex, $endIndex)"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
deleted file mode 100644
index 38c908d..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.typeinfo
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.operators.SingleInputOperator
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-
-/**
- * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some
- * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this
- * disappears since the translation methods simply returns the input.
- */
-class RenameOperator[T](
-    input: JavaDataSet[T],
-    renamingTypeInformation: RenamingProxyTypeInfo[T])
-  extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) {
-
-  override protected def translateToDataFlow(
-      input: Operator[T]): Operator[T] = input
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
deleted file mode 100644
index 0263f8a..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.typeinfo
-
-import java.util
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer}
-
-/**
- * A TypeInformation that is used to rename fields of an underlying CompositeType. This
- * allows the system to translate "as" expression operations to a [[RenameOperator]]
- * that does not get translated to a runtime operator.
- */
-class RenamingProxyTypeInfo[T](
-    tpe: CompositeType[T],
-    fieldNames: Array[String]) extends CompositeType[T](tpe.getTypeClass) {
-
-  def getUnderlyingType: CompositeType[T] = tpe
-
-  if (tpe.getArity != fieldNames.length) {
-    throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " +
-      s"number of fields in underlying type $tpe do not match.")
-  }
-
-  if (fieldNames.toSet.size != fieldNames.length) {
-    throw new IllegalArgumentException(s"New field names must be unique. " +
-      s"Names: ${fieldNames.mkString(",")}.")
-  }
-
-  override def getFieldIndex(fieldName: String): Int = {
-    val result = fieldNames.indexOf(fieldName)
-    if (result != fieldNames.lastIndexOf(fieldName)) {
-      -2
-    } else {
-      result
-    }
-  }
-  override def getFieldNames: Array[String] = fieldNames
-
-  override def isBasicType: Boolean = tpe.isBasicType
-
-  override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] =
-    tpe.createSerializer(executionConfig)
-
-  override def getArity: Int = tpe.getArity
-
-  override def isKeyType: Boolean = tpe.isKeyType
-
-  override def getTypeClass: Class[T] = tpe.getTypeClass
-
-  override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters
-
-  override def isTupleType: Boolean = tpe.isTupleType
-
-  override def toString = {
-    s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " +
-      s"fields: ${fieldNames.mkString(", ")})"
-  }
-
-  override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos)
-
-  override def getTotalFields: Int = tpe.getTotalFields
-
-  override def createComparator(
-        logicalKeyFields: Array[Int],
-        orders: Array[Boolean],
-        logicalFieldOffset: Int,
-        executionConfig: ExecutionConfig) =
-    tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig)
-
-  // These are never called since we override create comparator
-  override protected def initializeNewComparator(localKeyCount: Int): Unit =
-    throw new RuntimeException("Cannot happen.")
-
-  override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] =
-    throw new RuntimeException("Cannot happen.")
-
-  override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit =
-    throw new RuntimeException("Cannot happen.")
-
-  override def getFlatFields(
-      fieldExpression: String,
-      offset: Int,
-      result: util.List[FlatFieldDescriptor]): Unit = {
-    tpe.getFlatFields(fieldExpression, offset, result)
-  }
-
-  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
-    tpe.getTypeAt(fieldExpression)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
deleted file mode 100644
index 006c0c9..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.typeinfo
-
-import org.apache.flink.api.expressions.Row
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
-;
-
-/**
- * Serializer for [[Row]].
- */
-class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
-  extends TypeSerializer[Row] {
-
-  override def isImmutableType: Boolean = false
-
-  override def getLength: Int = -1
-
-  override def duplicate = this
-
-  override def createInstance: Row = {
-    new Row(fieldSerializers.length)
-  }
-
-  override def copy(from: Row, reuse: Row): Row = {
-    val len = fieldSerializers.length
-
-    if (from.productArity != len) {
-      throw new RuntimeException("Row arity of reuse and from do not match.")
-    }
-    var i = 0
-    while (i < len) {
-      val reuseField = reuse.productElement(i)
-      val fromField = from.productElement(i).asInstanceOf[AnyRef]
-      val copy = fieldSerializers(i).copy(fromField, reuseField)
-      reuse.setField(i, copy)
-      i += 1
-    }
-    reuse
-  }
-
-  override def copy(from: Row): Row = {
-    val len = fieldSerializers.length
-
-    if (from.productArity != len) {
-      throw new RuntimeException("Row arity of reuse and from do not match.")
-    }
-    val result = new Row(len)
-    var i = 0
-    while (i < len) {
-      val fromField = from.productElement(i).asInstanceOf[AnyRef]
-      val copy = fieldSerializers(i).copy(fromField)
-      result.setField(i, copy)
-      i += 1
-    }
-    result
-  }
-
-  override def serialize(value: Row, target: DataOutputView) {
-    val len = fieldSerializers.length
-    var i = 0
-    while (i < len) {
-      val serializer = fieldSerializers(i)
-      serializer.serialize(value.productElement(i), target)
-      i += 1
-    }
-  }
-
-  override def deserialize(reuse: Row, source: DataInputView): Row = {
-    val len = fieldSerializers.length
-
-    if (reuse.productArity != len) {
-      throw new RuntimeException("Row arity of reuse and fields do not match.")
-    }
-
-    var i = 0
-    while (i < len) {
-      val field = reuse.productElement(i).asInstanceOf[AnyRef]
-      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
-      i += 1
-    }
-    reuse
-  }
-
-  override def deserialize(source: DataInputView): Row = {
-    val len = fieldSerializers.length
-
-    val result = new Row(len)
-    var i = 0
-    while (i < len) {
-      result.setField(i, fieldSerializers(i).deserialize(source))
-      i += 1
-    }
-    result
-  }
-
-  override def copy(source: DataInputView, target: DataOutputView): Unit = {
-    val len = fieldSerializers.length
-    var i = 0
-    while (i < len) {
-      fieldSerializers(i).copy(source, target)
-      i += 1
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
deleted file mode 100644
index 92e9bc8..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.typeinfo
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.expressions.Row
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo}
-
-/**
- * TypeInformation for [[Row]].
- */
-class RowTypeInfo(
-    fieldTypes: Seq[TypeInformation[_]],
-    fieldNames: Seq[String])
-  extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) {
-
-  def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name))
-
-  if (fieldNames.toSet.size != fieldNames.size) {
-    throw new IllegalArgumentException("Field names must be unique.")
-  }
-
-  override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = {
-    val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity)
-    for (i <- 0 until getArity) {
-      fieldSerializers(i) = this.types(i).createSerializer(executionConfig)
-        .asInstanceOf[TypeSerializer[Any]]
-    }
-
-    new RowSerializer(fieldSerializers)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
deleted file mode 100644
index ad7cfe4..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.expressions
-
-import org.apache.flink.api.expressions.ExpressionOperation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator
-import org.apache.flink.api.scala.expressions.JavaStreamingTranslator
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
- * Convencience methods for creating an [[org.apache.flink.api.expressions.ExpressionOperation]]
- * and for converting an [[org.apache.flink.api.expressions.ExpressionOperation]] back
- * to a [[org.apache.flink.api.java.DataSet]] or
- * [[org.apache.flink.streaming.api.datastream.DataStream]].
- */
-object ExpressionUtil {
-  
-  /**
-   * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]].
-   * The fields of the DataSet type are renamed to the given set of fields:
-   *
-   * Example:
-   *
-   * {{{
-   *   ExpressionUtil.from(set, "a, b")
-   * }}}
-   *
-   * This will transform the set containing elements of two fields to a table where the fields
-   * are named a and b.
-   */
-  def from[T](set: DataSet[T], fields: String): ExpressionOperation[JavaBatchTranslator] = {
-    new JavaBatchTranslator().createExpressionOperation(set, fields)
-  }
-
-  /**
-   * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]].
-   * The fields of the DataSet type are used to name the
-   * [[org.apache.flink.api.expressions.ExpressionOperation]] fields.
-   */
-  def from[T](set: DataSet[T]): ExpressionOperation[JavaBatchTranslator] = {
-    new JavaBatchTranslator().createExpressionOperation(set)
-  }
-
-  /**
-   * Transforms the given DataStream to a [[org.apache.flink.api.expressions.ExpressionOperation]].
-   * The fields of the DataSet type are renamed to the given set of fields:
-   *
-   * Example:
-   *
-   * {{{
-   *   ExpressionUtil.from(set, "a, b")
-   * }}}
-   *
-   * This will transform the set containing elements of two fields to a table where the fields
-   * are named a and b.
-   */
-  def from[T](set: DataStream[T], fields: String): ExpressionOperation[JavaStreamingTranslator] = {
-    new JavaStreamingTranslator().createExpressionOperation(set, fields)
-  }
-
-  /**
-   * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]].
-   * The fields of the DataSet type are used to name the
-   * [[org.apache.flink.api.expressions.ExpressionOperation]] fields.
-   */
-  def from[T](set: DataStream[T]): ExpressionOperation[JavaStreamingTranslator] = {
-    new JavaStreamingTranslator().createExpressionOperation(set)
-  }
-
-  /**
-   * Converts the given [[org.apache.flink.api.expressions.ExpressionOperation]] to
-   * a DataSet. The given type must have exactly the same fields as the
-   * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the names of the
-   * fields and the types must match.
-   */
-  @SuppressWarnings(Array("unchecked"))
-  def toSet[T](
-      op: ExpressionOperation[JavaBatchTranslator],
-      clazz: Class[T]): DataSet[T] = {
-    op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataSet[T]]
-  }
-
-  /**
-   * Converts the given [[org.apache.flink.api.expressions.ExpressionOperation]] to
-   * a DataStream. The given type must have exactly the same fields as the
-   * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the names of the
-   * fields and the types must match.
-   */
-  @SuppressWarnings(Array("unchecked"))
-  def toStream[T](
-      op: ExpressionOperation[JavaStreamingTranslator], clazz: Class[T]): DataStream[T] = {
-    op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataStream[T]]
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
deleted file mode 100644
index 567d19c..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions._
-import org.apache.flink.api.expressions.tree.{UnresolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeutils.CompositeType
-
-import org.apache.flink.api.scala._
-
-/**
- * Methods for converting a [[DataSet]] to an [[ExpressionOperation]]. A [[DataSet]] is
- * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.expressions]].
- */
-class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
-
-  /**
-   * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names of the resulting
-   * expression operation can be specified like this:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val expr = in.as('a, 'b)
-   * }}}
-   *
-   * This results in an expression operation that has field `a` of type `String` and field `b`
-   * of type `Int`.
-   */
-  def as(fields: Expression*): ExpressionOperation[ScalaBatchTranslator] = {
-     new ScalaBatchTranslator().createExpressionOperation(set, fields.toArray)
-  }
-
-  /**
-   * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names of the resulting
-   * expression operation will be taken from the field names of the input type:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val expr = in.toExpression
-   * }}}
-   *
-   * This results in an expression operation that has field `_1` of type `String` and field `_2`
-   * of type `Int`.
-   */
-  def toExpression: ExpressionOperation[ScalaBatchTranslator] = {
-    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
-    as(resultFields: _*)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
deleted file mode 100644
index 49dbce7..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions._
-import org.apache.flink.api.expressions.tree.{Expression, UnresolvedFieldReference}
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.DataStream
-
-class DataStreamConversions[T](set: DataStream[T], inputType: CompositeType[T]) {
-
-  /**
-   * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field names of the resulting
-   * expression operation can be specified like this:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val expr = in.as('a, 'b)
-   * }}}
-   *
-   * This results in an expression operation that has field `a` of type `String` and field `b`
-   * of type `Int`.
-   */
-
-  def as(fields: Expression*): ExpressionOperation[ScalaStreamingTranslator] = {
-     new ScalaStreamingTranslator().createExpressionOperation(set, fields.toArray)
-  }
-
-  /**
-   * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field names of the resulting
-   * expression operation will be taken from the field names of the input type:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val expr = in.toExpression
-   * }}}
-   *
-   * This results in an expression operation that has field `_1` of type `String` and field `_2`
-   * of type `Int`.
-   */
-
-  def toExpression: ExpressionOperation[ScalaStreamingTranslator] = {
-    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
-    as(resultFields: _*)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
deleted file mode 100644
index ae41ceb..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions.analysis.ExtractEquiJoinFields
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.parser.ExpressionParser
-import org.apache.flink.api.expressions.runtime.{ExpressionAggregateFunction, ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction}
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.expressions.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo}
-import org.apache.flink.api.expressions.{ExpressionException, ExpressionOperation, Row}
-import org.apache.flink.api.java.aggregation.AggregationFunction
-import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
-import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping}
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-
-/**
- * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java [[JavaDataSet]]s and
- * translating them back to Scala [[JavaDataSet]]s.
- */
-class JavaBatchTranslator extends OperationTranslator {
-
-  type Representation[A] = JavaDataSet[A]
-
-
-  def createExpressionOperation[A](
-    repr: JavaDataSet[A]): ExpressionOperation[JavaBatchTranslator] = {
-    val fields =
-      repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference)
-
-    createExpressionOperation(repr, fields.toArray.asInstanceOf[Array[Expression]], false)
-  }
-
-  def createExpressionOperation[A](
-    repr: JavaDataSet[A],
-    expression: String): ExpressionOperation[JavaBatchTranslator] = {
-    val fields = ExpressionParser.parseExpressionList(expression)
-
-    createExpressionOperation(repr, fields.toArray)
-  }
-
-  def createExpressionOperation[A](
-      repr: JavaDataSet[A],
-      fields: Array[Expression],
-      checkDeterministicFields: Boolean = true): ExpressionOperation[JavaBatchTranslator] = {
-
-    // shortcut for DataSet[Row]
-    repr.getType match {
-      case rowTypeInfo: RowTypeInfo =>
-        val expressions = rowTypeInfo.getFieldNames map {
-          name => (name, rowTypeInfo.getTypeAt(name))
-        }
-        new ExpressionOperation(
-          Root(repr.asInstanceOf[JavaDataSet[Row]], expressions), this)
-      case _ =>
-    }
-
-    val clazz = repr.getType.getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create expression Operation from DataSet of type " +
-        clazz.getName + ". Only top-level classes or static members classes " +
-        " are supported.")
-    }
-
-    if (!repr.getType.isInstanceOf[CompositeType[_]]) {
-      throw new ExpressionException("Only DataSets of composite type can be transformed to an" +
-        " Expression Operation. These would be tuples, case classes and POJOs.")
-    }
-
-    val inputType = repr.getType.asInstanceOf[CompositeType[A]]
-
-    if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
-      throw new ExpressionException(s"You cannot rename fields upon Table creaton: " +
-          s"Field order of input type $inputType is not deterministic." )
-    }
-
-    if (fields.length != inputType.getFieldNames.length) {
-      throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") +
-        "' and number of fields in input type " + inputType + " do not match.")
-    }
-
-    val newFieldNames = fields map {
-      case UnresolvedFieldReference(name) => name
-      case e =>
-        throw new ExpressionException("Only field references allowed in 'as' operation, " +
-          " offending expression: " + e)
-    }
-
-    if (newFieldNames.toSet.size != newFieldNames.size) {
-      throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}")
-    }
-
-    val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map {
-      case (name, index) => (name, inputType.getTypeAt(index))
-    }
-
-    val inputFields = inputType.getFieldNames
-    val fieldMappings = inputFields.zip(resultFields)
-    val expressions: Array[Expression] = fieldMappings map {
-      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName)
-    }
-
-    val rowDataSet = createSelect(expressions, repr, inputType)
-
-    new ExpressionOperation(Root(rowDataSet, resultFields), new JavaBatchTranslator)
-  }
-
-  override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {
-
-    if (tpe.getTypeClass == classOf[Row]) {
-      // shortcut for DataSet[Row]
-      return translateInternal(op).asInstanceOf[JavaDataSet[A]]
-    }
-
-    val clazz = tpe.getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create DataSet of type " +
-        clazz.getName + ". Only top-level classes or static member classes are supported.")
-    }
-
-
-    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
-      throw new ExpressionException(
-        "Expression operations can only be converted to composite types, type is: " +
-          implicitly[TypeInformation[A]] +
-          ". Composite types would be tuples, case classes and POJOs.")
-    }
-
-    val resultSet = translateInternal(op)
-
-    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
-
-    val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
-
-    val resultNames = resultType.getFieldNames
-    val outputNames = outputType.getFieldNames.toSeq
-
-    if (resultNames.toSet != outputNames.toSet) {
-      throw new ExpressionException(s"Expression result type $resultType does not have the same" +
-        s"fields as output type $outputType")
-    }
-
-    for (f <- outputNames) {
-      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
-      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
-      if (!in.equals(out)) {
-        throw new ExpressionException(s"Types for field $f differ on input $resultType and " +
-          s"output $outputType.")
-      }
-    }
-
-    val outputFields = outputNames map {
-      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
-    }
-
-    val function = new ExpressionSelectFunction(
-      resultSet.getType.asInstanceOf[RowTypeInfo],
-      outputType,
-      outputFields)
-
-    val opName = s"select(${outputFields.mkString(",")})"
-    val operator = new MapOperator(resultSet, outputType, function, opName)
-
-    operator
-  }
-
-  private def translateInternal(op: Operation): JavaDataSet[Row] = {
-    op match {
-      case Root(dataSet: JavaDataSet[Row], resultFields) =>
-        dataSet
-
-      case Root(_, _) =>
-        throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op)
-
-      case GroupBy(_, fields) =>
-        throw new ExpressionException("Dangling GroupBy operation. Did you forget a " +
-          "SELECT statement?")
-
-      case As(input, newNames) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray)
-        new RenameOperator(translatedInput, proxyType)
-
-      case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          val translatedLeftInput = translateInternal(leftInput)
-          val translatedRightInput = translateInternal(rightInput)
-          val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-          val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-          createJoin(
-            predicate,
-            selection,
-            translatedLeftInput,
-            translatedRightInput,
-            leftInType,
-            rightInType,
-            JoinHint.OPTIMIZER_CHOOSES)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case Filter(Join(leftInput, rightInput), predicate) =>
-        val translatedLeftInput = translateInternal(leftInput)
-        val translatedRightInput = translateInternal(rightInput)
-        val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-        val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-        createJoin(
-          predicate,
-          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++
-            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)),
-          translatedLeftInput,
-          translatedRightInput,
-          leftInType,
-          rightInType,
-          JoinHint.OPTIMIZER_CHOOSES)
-
-      case Join(leftInput, rightInput) =>
-        throw new ExpressionException("Join without filter condition encountered. " +
-          "Did you forget to add .where(...) ?")
-
-      case sel@Select(input, selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          // no expansions took place
-          val translatedInput = translateInternal(input)
-          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-          val inputFields = inType.getFieldNames
-          createSelect(
-            selection,
-            translatedInput,
-            inType)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-
-        val keyIndices = groupExpressions map {
-          case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name)
-          case e => throw new ExpressionException(s"Expression $e is not a valid key expression.")
-        }
-
-        val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false)
-
-        val grouping = new UnsortedGrouping(translatedInput, keys)
-
-        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
-          case (fieldName, fun) =>
-            fun.getFactory.createAggregationFunction[Any](
-              inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
-        }
-
-        val aggIndices = aggregations map {
-          case (fieldName, _) =>
-            inType.getFieldIndex(fieldName)
-        }
-
-        val result = new GroupReduceOperator(
-          grouping,
-          inType,
-          new ExpressionAggregateFunction(aggIndices, aggFunctions),
-          "Expression Aggregation: " + agg)
-
-        result
-
-      case agg@Aggregate(input, aggregations) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-
-        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
-          case (fieldName, fun) =>
-            fun.getFactory.createAggregationFunction[Any](
-              inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
-        }
-
-        val aggIndices = aggregations map {
-          case (fieldName, _) =>
-            inType.getFieldIndex(fieldName)
-        }
-
-        val result = new GroupReduceOperator(
-          translatedInput,
-          inType,
-          new ExpressionAggregateFunction(aggIndices, aggFunctions),
-          "Expression Aggregation: " + agg)
-
-        result
-
-
-      case Filter(input, predicate) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val filter = new ExpressionFilterFunction[Row](predicate, inType)
-        translatedInput.filter(filter)
-    }
-  }
-
-  private def createSelect[I](
-      fields: Seq[Expression],
-      input: JavaDataSet[I],
-      inputType: CompositeType[I]): JavaDataSet[Row] = {
-
-    fields foreach {
-      f =>
-        if (f.exists(_.isInstanceOf[Aggregation])) {
-          throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".")
-        }
-
-    }
-
-    val resultType = new RowTypeInfo(fields)
-
-    val function = new ExpressionSelectFunction(inputType, resultType, fields)
-
-    val opName = s"select(${fields.mkString(",")})"
-    val operator = new MapOperator(input, resultType, function, opName)
-
-    operator
-  }
-
-  private def createJoin[L, R](
-      predicate: Expression,
-      fields: Seq[Expression],
-      leftInput: JavaDataSet[L],
-      rightInput: JavaDataSet[R],
-      leftType: CompositeType[L],
-      rightType: CompositeType[R],
-      joinHint: JoinHint): JavaDataSet[Row] = {
-
-    val resultType = new RowTypeInfo(fields)
-
-    val (reducedPredicate, leftFields, rightFields) =
-      ExtractEquiJoinFields(leftType, rightType, predicate)
-
-    if (leftFields.isEmpty || rightFields.isEmpty) {
-      throw new ExpressionException("Could not derive equi-join predicates " +
-        "for predicate " + predicate + ".")
-    }
-
-    val leftKey = new ExpressionKeys[L](leftFields, leftType)
-    val rightKey = new ExpressionKeys[R](rightFields, rightType)
-
-    val joiner = new ExpressionJoinFunction[L, R, Row](
-      reducedPredicate,
-      leftType,
-      rightType,
-      resultType,
-      fields)
-
-    new EquiJoin[L, R, Row](
-      leftInput,
-      rightInput,
-      leftKey,
-      rightKey,
-      joiner,
-      resultType,
-      joinHint,
-      predicate.toString)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
deleted file mode 100644
index 56c38af..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.parser.ExpressionParser
-import org.apache.flink.api.expressions.runtime.{ExpressionFilterFunction, ExpressionSelectFunction}
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.expressions.typeinfo.RowTypeInfo
-import org.apache.flink.api.expressions.{ExpressionException, ExpressionOperation, Row}
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable
-
-/**
- * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java [[DataStream]]s and
- * translating them back to Java [[DataStream]]s.
- *
- * This is very limited right now. Only select and filter are implemented. Also, the expression
- * operations must be extended to allow windowing operations.
- */
-
-class JavaStreamingTranslator extends OperationTranslator {
-
-  type Representation[A] = DataStream[A]
-
-
-  def createExpressionOperation[A](
-    repr: DataStream[A]): ExpressionOperation[JavaStreamingTranslator] = {
-    val fields =
-      repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference)
-
-    createExpressionOperation(repr, fields.toArray.asInstanceOf[Array[Expression]])
-  }
-
-  def createExpressionOperation[A](
-      repr: DataStream[A],
-      expression: String): ExpressionOperation[JavaStreamingTranslator] = {
-    val fields = ExpressionParser.parseExpressionList(expression)
-
-    createExpressionOperation(repr, fields.toArray)
-  }
-
-  def createExpressionOperation[A](
-      repr: DataStream[A],
-      fields: Array[Expression]): ExpressionOperation[JavaStreamingTranslator] = {
-
-    // shortcut for DataSet[Row]
-    repr.getType match {
-      case rowTypeInfo: RowTypeInfo =>
-        val expressions = rowTypeInfo.getFieldNames map {
-          name => (name, rowTypeInfo.getTypeAt(name))
-        }
-        new ExpressionOperation(
-          Root(repr.asInstanceOf[DataStream[Row]], expressions), this)
-      case _ =>
-    }
-
-    val clazz = repr.getType.getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create expression Operation from DataSet of type " +
-        clazz.getName + ". Only top-level classes or static members classes " +
-        " are supported.")
-    }
-
-    if (!repr.getType.isInstanceOf[CompositeType[_]]) {
-      throw new ExpressionException("Only DataSets of composite type can be transformed to an" +
-        " Expression Operation. These would be tuples, case classes and POJOs.")
-    }
-
-    val inputType = repr.getType.asInstanceOf[CompositeType[A]]
-
-    if (fields.length != inputType.getFieldNames.length) {
-      throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") +
-        "' and number of fields in input type " + inputType + " do not match.")
-    }
-
-    val newFieldNames = fields map {
-      case UnresolvedFieldReference(name) => name
-      case e =>
-        throw new ExpressionException("Only field references allowed in 'as' operation, " +
-          " offending expression: " + e)
-    }
-
-    if (newFieldNames.toSet.size != newFieldNames.size) {
-      throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}")
-    }
-
-    val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map {
-      case (name, index) => (name, inputType.getTypeAt(index))
-    }
-
-    val inputFields = inputType.getFieldNames
-    val fieldMappings = inputFields.zip(resultFields)
-    val expressions: Array[Expression] = fieldMappings map {
-      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName)
-    }
-
-    val rowDataSet = createSelect(expressions, repr, inputType)
-
-    new ExpressionOperation(Root(rowDataSet, resultFields), new JavaStreamingTranslator)
-  }
-
-  override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-
-    if (tpe.getTypeClass == classOf[Row]) {
-      // shortcut for DataSet[Row]
-      return translateInternal(op).asInstanceOf[DataStream[A]]
-    }
-
-    val clazz = tpe.getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create DataStream of type " +
-        clazz.getName + ". Only top-level classes or static member classes are supported.")
-    }
-
-    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
-      throw new ExpressionException(
-        "Expression operations can only be converted to composite types, type is: " +
-          implicitly[TypeInformation[A]] +
-          ". Composite types would be tuples, case classes and POJOs.")
-
-    }
-
-    val resultSet = translateInternal(op)
-
-    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
-
-    val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
-
-    val resultNames = resultType.getFieldNames
-    val outputNames = outputType.getFieldNames.toSeq
-
-    if (resultNames.toSet != outputNames.toSet) {
-      throw new ExpressionException(s"Expression result type $resultType does not have the same" +
-        s"fields as output type $outputType")
-    }
-
-    for (f <- outputNames) {
-      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
-      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
-      if (!in.equals(out)) {
-        throw new ExpressionException(s"Types for field $f differ on input $resultType and " +
-          s"output $outputType.")
-      }
-    }
-
-    val outputFields = outputNames map {
-      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
-    }
-
-    val function = new ExpressionSelectFunction(
-      resultSet.getType.asInstanceOf[RowTypeInfo],
-      outputType,
-      outputFields)
-
-    val opName = s"select(${outputFields.mkString(",")})"
-
-    resultSet.transform(opName, outputType, new MapInvokable[Row, A](function))
-  }
-
-  private def translateInternal(op: Operation): DataStream[Row] = {
-    op match {
-      case Root(dataSet: DataStream[Row], resultFields) =>
-        dataSet
-
-      case Root(_, _) =>
-        throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op)
-
-      case GroupBy(_, fields) =>
-        throw new ExpressionException("Dangling GroupBy operation. Did you forget a " +
-          "SELECT statement?")
-
-      case As(input, newNames) =>
-        throw new ExpressionException("As operation for Streams not yet implemented.")
-
-      case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          val translatedLeftInput = translateInternal(leftInput)
-          val translatedRightInput = translateInternal(rightInput)
-          val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-          val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-          createJoin(
-            predicate,
-            selection,
-            translatedLeftInput,
-            translatedRightInput,
-            leftInType,
-            rightInType,
-            JoinHint.OPTIMIZER_CHOOSES)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case Filter(Join(leftInput, rightInput), predicate) =>
-        val translatedLeftInput = translateInternal(leftInput)
-        val translatedRightInput = translateInternal(rightInput)
-        val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-        val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-        createJoin(
-          predicate,
-          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++
-            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)),
-          translatedLeftInput,
-          translatedRightInput,
-          leftInType,
-          rightInType,
-          JoinHint.OPTIMIZER_CHOOSES)
-
-      case Join(leftInput, rightInput) =>
-        throw new ExpressionException("Join without filter condition encountered. " +
-          "Did you forget to add .where(...) ?")
-
-      case sel@Select(input, selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          // no expansions took place
-          val translatedInput = translateInternal(input)
-          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-          val inputFields = inType.getFieldNames
-          createSelect(
-            selection,
-            translatedInput,
-            inType)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
-        throw new ExpressionException("Aggregate operation for Streams not yet implemented.")
-
-      case agg@Aggregate(input, aggregations) =>
-        throw new ExpressionException("Aggregate operation for Streams not yet implemented.")
-
-      case Filter(input, predicate) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val filter = new ExpressionFilterFunction[Row](predicate, inType)
-        translatedInput.filter(filter)
-    }
-  }
-
-  private def createSelect[I](
-      fields: Seq[Expression],
-      input: DataStream[I],
-      inputType: CompositeType[I]): DataStream[Row] = {
-
-    fields foreach {
-      f =>
-        if (f.exists(_.isInstanceOf[Aggregation])) {
-          throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".")
-        }
-
-    }
-
-    val resultType = new RowTypeInfo(fields)
-
-    val function = new ExpressionSelectFunction(inputType, resultType, fields)
-
-    val opName = s"select(${fields.mkString(",")})"
-
-    input.transform(opName, resultType, new MapInvokable[I, Row](function))
-  }
-
-  private def createJoin[L, R](
-      predicate: Expression,
-      fields: Seq[Expression],
-      leftInput: DataStream[L],
-      rightInput: DataStream[R],
-      leftType: CompositeType[L],
-      rightType: CompositeType[R],
-      joinHint: JoinHint): DataStream[Row] = {
-
-    throw new ExpressionException("Join operation for Streams not yet implemented.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
deleted file mode 100644
index 724c8a7..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.scala.wrap
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.ExpressionOperation
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-
-import scala.reflect.ClassTag
-
-
-/**
- * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala [[DataSet]]s and
- * translating them back to Scala [[DataSet]]s.
- */
-class ScalaBatchTranslator extends OperationTranslator {
-
-  private val javaTranslator = new JavaBatchTranslator
-
-  override type Representation[A] = DataSet[A]
-
-  def createExpressionOperation[A](
-      repr: DataSet[A],
-      fields: Array[Expression]): ExpressionOperation[ScalaBatchTranslator] = {
-
-    val result = javaTranslator.createExpressionOperation(repr.javaSet, fields)
-
-    new ExpressionOperation[ScalaBatchTranslator](result.operation, this)
-  }
-
-  override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataSet[O] = {
-    // fake it till you make it ...
-    wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
deleted file mode 100644
index 7db483f..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.expressions.{ExpressionOperation, Row}
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.streaming.api.scala.DataStream
-
-import org.apache.flink.streaming.api.scala.javaToScalaStream
-
-/**
- * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala [[DataStream]]s and
- * translating them back to Scala [[DataStream]]s.
- *
- * This is very limited right now. Only select and filter are implemented. Also, the expression
- * operations must be extended to allow windowing operations.
- */
-class ScalaStreamingTranslator extends OperationTranslator {
-
-  private val javaTranslator = new JavaStreamingTranslator
-
-  override type Representation[A] = DataStream[A]
-
-  def createExpressionOperation[A](
-      repr: DataStream[A],
-      fields: Array[Expression]): ExpressionOperation[ScalaStreamingTranslator] = {
-
-    val result = javaTranslator.createExpressionOperation(repr.getJavaStream, fields)
-
-    new ExpressionOperation[ScalaStreamingTranslator](result.operation, this)
-  }
-
-  override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataStream[O] = {
-    // fake it till you make it ...
-    javaToScalaStream(javaTranslator.translate(op))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
deleted file mode 100644
index 1f6c397..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.language.implicitConversions
-
-/**
- * These are all the operations that can be used to construct an [[Expression]] AST for expression
- * operations.
- *
- * These operations must be kept in sync with the parser in
- * [[org.apache.flink.api.expressions.parser.ExpressionParser]].
- */
-trait ImplicitExpressionOperations {
-  def expr: Expression
-
-  def && (other: Expression) = And(expr, other)
-  def || (other: Expression) = Or(expr, other)
-
-  def > (other: Expression) = GreaterThan(expr, other)
-  def >= (other: Expression) = GreaterThanOrEqual(expr, other)
-  def < (other: Expression) = LessThan(expr, other)
-  def <= (other: Expression) = LessThanOrEqual(expr, other)
-
-  def === (other: Expression) = EqualTo(expr, other)
-  def !== (other: Expression) = NotEqualTo(expr, other)
-
-  def unary_! = Not(expr)
-  def unary_- = UnaryMinus(expr)
-
-  def isNull = IsNull(expr)
-  def isNotNull = IsNotNull(expr)
-
-  def + (other: Expression) = Plus(expr, other)
-  def - (other: Expression) = Minus(expr, other)
-  def / (other: Expression) = Div(expr, other)
-  def * (other: Expression) = Mul(expr, other)
-  def % (other: Expression) = Mod(expr, other)
-
-  def & (other: Expression) = BitwiseAnd(expr, other)
-  def | (other: Expression) = BitwiseOr(expr, other)
-  def ^ (other: Expression) = BitwiseXor(expr, other)
-  def unary_~ = BitwiseNot(expr)
-
-  def abs = Abs(expr)
-
-  def sum = Sum(expr)
-  def min = Min(expr)
-  def max = Max(expr)
-  def count = Count(expr)
-  def avg = Avg(expr)
-
-  def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = {
-    Substring(expr, beginIndex, endIndex)
-  }
-
-  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
-
-  def as(name: Symbol) = Naming(expr, name.name)
-}
-
-/**
- * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]]
- * to [[ImplicitExpressionOperations]].
- */
-trait ImplicitExpressionConversions {
-  implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
-    def expr = e
-  }
-
-  implicit class SymbolExpression(s: Symbol) extends ImplicitExpressionOperations {
-    def expr = UnresolvedFieldReference(s.name)
-  }
-
-  implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations {
-    def expr = Literal(l)
-  }
-
-  implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations {
-    def expr = Literal(i)
-  }
-
-  implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations {
-    def expr = Literal(f)
-  }
-
-  implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations {
-    def expr = Literal(d)
-  }
-
-  implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations {
-    def expr = Literal(str)
-  }
-
-  implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations {
-    def expr = Literal(bool)
-  }
-
-  implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name)
-  implicit def int2Literal(i: Int): Expression = Literal(i)
-  implicit def long2Literal(l: Long): Expression = Literal(l)
-  implicit def double2Literal(d: Double): Expression = Literal(d)
-  implicit def float2Literal(d: Float): Expression = Literal(d)
-  implicit def string2Literal(str: String): Expression = Literal(str)
-  implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
-}