You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/29 13:50:36 UTC
[07/12] flink git commit: [FLINK-1623] Rename Expression API to Table
API
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
deleted file mode 100644
index e3f62f8..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation}
-
-abstract class BinaryArithmetic extends BinaryExpression {
- def typeInfo = {
- if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
- }
- if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""")
- }
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- left.typeInfo
- }
-}
-
-case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
- override def typeInfo = {
- if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
- !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
- throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this")
- }
- if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
- !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
- throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this")
- }
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- left.typeInfo
- }
-
- override def toString = s"($left + $right)"
-}
-
-case class UnaryMinus(child: Expression) extends UnaryExpression {
- def typeInfo = {
- if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
- }
- child.typeInfo
- }
-
- override def toString = s"-($child)"
-}
-
-case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left - $right)"
-}
-
-case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left / $right)"
-}
-
-case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left * $right)"
-}
-
-case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left * $right)"
-}
-
-case class Abs(child: Expression) extends UnaryExpression {
- def typeInfo = child.typeInfo
-
- override def toString = s"abs($child)"
-}
-
-abstract class BitwiseBinaryArithmetic extends BinaryExpression {
- def typeInfo: TypeInformation[_] = {
- if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
- }
- if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""")
- }
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
- left.typeInfo
- } else {
- BasicTypeInfo.INT_TYPE_INFO
- }
- }
-}
-
-case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
- override def toString = s"($left & $right)"
-}
-
-case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
- override def toString = s"($left | $right)"
-}
-
-
-case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
- override def toString = s"($left ^ $right)"
-}
-
-case class BitwiseNot(child: Expression) extends UnaryExpression {
- def typeInfo: TypeInformation[_] = {
- if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
- }
- if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
- child.typeInfo
- } else {
- BasicTypeInfo.INT_TYPE_INFO
- }
- }
-
- override def toString = s"~($child)"
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
deleted file mode 100644
index f83de5b..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
- def typeInfo = tpe
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
deleted file mode 100644
index fdb5fd0..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
-
-abstract class BinaryComparison extends BinaryExpression {
- def typeInfo = {
- if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(s"Non-numeric operand ${left} in $this")
- }
- if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(s"Non-numeric operand ${right} in $this")
- }
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-}
-
-case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
- override def typeInfo = {
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override def toString = s"$left === $right"
-}
-
-case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
- override def typeInfo = {
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override def toString = s"$left !== $right"
-}
-
-case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left > $right"
-}
-
-case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left >= $right"
-}
-
-case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left < $right"
-}
-
-case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left <= $right"
-}
-
-case class IsNull(child: Expression) extends UnaryExpression {
- def typeInfo = {
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override def toString = s"($child).isNull"
-}
-
-case class IsNotNull(child: Expression) extends UnaryExpression {
- def typeInfo = {
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override def toString = s"($child).isNotNull"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
deleted file mode 100644
index a1d8589..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class UnresolvedFieldReference(override val name: String) extends LeafExpression {
- def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this")
-
- override def toString = "\"" + name
-}
-
-case class ResolvedFieldReference(
- override val name: String,
- tpe: TypeInformation[_]) extends LeafExpression {
- def typeInfo = tpe
-
- override def toString = s"'$name"
-}
-
-case class Naming(child: Expression, override val name: String) extends UnaryExpression {
- def typeInfo = child.typeInfo
-
- override def toString = s"$child as '$name"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
deleted file mode 100644
index 03949ee..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala.expressions.ImplicitExpressionOperations
-
-object Literal {
- def apply(l: Any): Literal = l match {
- case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
- case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
- case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
- case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
- case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
- case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
- }
-}
-
-case class Literal(value: Any, tpe: TypeInformation[_])
- extends LeafExpression with ImplicitExpressionOperations {
- def expr = this
- def typeInfo = tpe
-
- override def toString = s"$value"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
deleted file mode 100644
index 8f0a068..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-abstract class BinaryPredicate extends BinaryExpression {
- def typeInfo = {
- if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
- right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-}
-
-case class Not(child: Expression) extends UnaryExpression {
- def typeInfo = {
- if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override val name = Expression.freshName("not-" + child.name)
-
- override def toString = s"!($child)"
-}
-
-case class And(left: Expression, right: Expression) extends BinaryPredicate {
- override def toString = s"$left && $right"
-
- override val name = Expression.freshName(left.name + "-and-" + right.name)
-}
-
-case class Or(left: Expression, right: Expression) extends BinaryPredicate {
- override def toString = s"$left || $right"
-
- override val name = Expression.freshName(left.name + "-or-" + right.name)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
deleted file mode 100644
index 04c29f7..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-/**
- * This package contains the base class of AST nodes and all the expression language AST classes.
- * Expression trees should not be manually constructed by users. They are implicitly constructed
- * from the implicit DSL conversions in
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API,
- * expression trees should be generated from a string parser that parses expressions and creates
- * AST nodes.
- */
-package object tree
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
deleted file mode 100644
index 175d445..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
-
-case class Substring(
- str: Expression,
- beginIndex: Expression,
- endIndex: Expression) extends Expression {
- def typeInfo = {
- if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
- throw new ExpressionException(
- s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
- }
- if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""")
- }
- if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""")
- }
-
- BasicTypeInfo.STRING_TYPE_INFO
- }
-
- override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
- override def toString = s"($str).substring($beginIndex, $endIndex)"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
deleted file mode 100644
index 38c908d..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.typeinfo
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.operators.SingleInputOperator
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-
-/**
- * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some
- * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this
- * disappears since the translation methods simply returns the input.
- */
-class RenameOperator[T](
- input: JavaDataSet[T],
- renamingTypeInformation: RenamingProxyTypeInfo[T])
- extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) {
-
- override protected def translateToDataFlow(
- input: Operator[T]): Operator[T] = input
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
deleted file mode 100644
index 0263f8a..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.typeinfo
-
-import java.util
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer}
-
-/**
- * A TypeInformation that is used to rename fields of an underlying CompositeType. This
- * allows the system to translate "as" expression operations to a [[RenameOperator]]
- * that does not get translated to a runtime operator.
- */
-class RenamingProxyTypeInfo[T](
- tpe: CompositeType[T],
- fieldNames: Array[String]) extends CompositeType[T](tpe.getTypeClass) {
-
- def getUnderlyingType: CompositeType[T] = tpe
-
- if (tpe.getArity != fieldNames.length) {
- throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " +
- s"number of fields in underlying type $tpe do not match.")
- }
-
- if (fieldNames.toSet.size != fieldNames.length) {
- throw new IllegalArgumentException(s"New field names must be unique. " +
- s"Names: ${fieldNames.mkString(",")}.")
- }
-
- override def getFieldIndex(fieldName: String): Int = {
- val result = fieldNames.indexOf(fieldName)
- if (result != fieldNames.lastIndexOf(fieldName)) {
- -2
- } else {
- result
- }
- }
- override def getFieldNames: Array[String] = fieldNames
-
- override def isBasicType: Boolean = tpe.isBasicType
-
- override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] =
- tpe.createSerializer(executionConfig)
-
- override def getArity: Int = tpe.getArity
-
- override def isKeyType: Boolean = tpe.isKeyType
-
- override def getTypeClass: Class[T] = tpe.getTypeClass
-
- override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters
-
- override def isTupleType: Boolean = tpe.isTupleType
-
- override def toString = {
- s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " +
- s"fields: ${fieldNames.mkString(", ")})"
- }
-
- override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos)
-
- override def getTotalFields: Int = tpe.getTotalFields
-
- override def createComparator(
- logicalKeyFields: Array[Int],
- orders: Array[Boolean],
- logicalFieldOffset: Int,
- executionConfig: ExecutionConfig) =
- tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig)
-
- // These are never called since we override create comparator
- override protected def initializeNewComparator(localKeyCount: Int): Unit =
- throw new RuntimeException("Cannot happen.")
-
- override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] =
- throw new RuntimeException("Cannot happen.")
-
- override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit =
- throw new RuntimeException("Cannot happen.")
-
- override def getFlatFields(
- fieldExpression: String,
- offset: Int,
- result: util.List[FlatFieldDescriptor]): Unit = {
- tpe.getFlatFields(fieldExpression, offset, result)
- }
-
- override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
- tpe.getTypeAt(fieldExpression)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
deleted file mode 100644
index 006c0c9..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.typeinfo
-
-import org.apache.flink.api.expressions.Row
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
-;
-
-/**
- * Serializer for [[Row]].
- */
-class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
- extends TypeSerializer[Row] {
-
- override def isImmutableType: Boolean = false
-
- override def getLength: Int = -1
-
- override def duplicate = this
-
- override def createInstance: Row = {
- new Row(fieldSerializers.length)
- }
-
- override def copy(from: Row, reuse: Row): Row = {
- val len = fieldSerializers.length
-
- if (from.productArity != len) {
- throw new RuntimeException("Row arity of reuse and from do not match.")
- }
- var i = 0
- while (i < len) {
- val reuseField = reuse.productElement(i)
- val fromField = from.productElement(i).asInstanceOf[AnyRef]
- val copy = fieldSerializers(i).copy(fromField, reuseField)
- reuse.setField(i, copy)
- i += 1
- }
- reuse
- }
-
- override def copy(from: Row): Row = {
- val len = fieldSerializers.length
-
- if (from.productArity != len) {
- throw new RuntimeException("Row arity of reuse and from do not match.")
- }
- val result = new Row(len)
- var i = 0
- while (i < len) {
- val fromField = from.productElement(i).asInstanceOf[AnyRef]
- val copy = fieldSerializers(i).copy(fromField)
- result.setField(i, copy)
- i += 1
- }
- result
- }
-
- override def serialize(value: Row, target: DataOutputView) {
- val len = fieldSerializers.length
- var i = 0
- while (i < len) {
- val serializer = fieldSerializers(i)
- serializer.serialize(value.productElement(i), target)
- i += 1
- }
- }
-
- override def deserialize(reuse: Row, source: DataInputView): Row = {
- val len = fieldSerializers.length
-
- if (reuse.productArity != len) {
- throw new RuntimeException("Row arity of reuse and fields do not match.")
- }
-
- var i = 0
- while (i < len) {
- val field = reuse.productElement(i).asInstanceOf[AnyRef]
- reuse.setField(i, fieldSerializers(i).deserialize(field, source))
- i += 1
- }
- reuse
- }
-
- override def deserialize(source: DataInputView): Row = {
- val len = fieldSerializers.length
-
- val result = new Row(len)
- var i = 0
- while (i < len) {
- result.setField(i, fieldSerializers(i).deserialize(source))
- i += 1
- }
- result
- }
-
- override def copy(source: DataInputView, target: DataOutputView): Unit = {
- val len = fieldSerializers.length
- var i = 0
- while (i < len) {
- fieldSerializers(i).copy(source, target)
- i += 1
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
deleted file mode 100644
index 92e9bc8..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.typeinfo
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.expressions.Row
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo}
-
-/**
- * TypeInformation for [[Row]].
- */
-class RowTypeInfo(
- fieldTypes: Seq[TypeInformation[_]],
- fieldNames: Seq[String])
- extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) {
-
- def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name))
-
- if (fieldNames.toSet.size != fieldNames.size) {
- throw new IllegalArgumentException("Field names must be unique.")
- }
-
- override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = {
- val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity)
- for (i <- 0 until getArity) {
- fieldSerializers(i) = this.types(i).createSerializer(executionConfig)
- .asInstanceOf[TypeSerializer[Any]]
- }
-
- new RowSerializer(fieldSerializers)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
deleted file mode 100644
index ad7cfe4..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.expressions
-
-import org.apache.flink.api.expressions.ExpressionOperation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator
-import org.apache.flink.api.scala.expressions.JavaStreamingTranslator
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
- * Convencience methods for creating an [[org.apache.flink.api.expressions.ExpressionOperation]]
- * and for converting an [[org.apache.flink.api.expressions.ExpressionOperation]] back
- * to a [[org.apache.flink.api.java.DataSet]] or
- * [[org.apache.flink.streaming.api.datastream.DataStream]].
- */
-object ExpressionUtil {
-
- /**
- * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]].
- * The fields of the DataSet type are renamed to the given set of fields:
- *
- * Example:
- *
- * {{{
- * ExpressionUtil.from(set, "a, b")
- * }}}
- *
- * This will transform the set containing elements of two fields to a table where the fields
- * are named a and b.
- */
- def from[T](set: DataSet[T], fields: String): ExpressionOperation[JavaBatchTranslator] = {
- new JavaBatchTranslator().createExpressionOperation(set, fields)
- }
-
- /**
- * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]].
- * The fields of the DataSet type are used to name the
- * [[org.apache.flink.api.expressions.ExpressionOperation]] fields.
- */
- def from[T](set: DataSet[T]): ExpressionOperation[JavaBatchTranslator] = {
- new JavaBatchTranslator().createExpressionOperation(set)
- }
-
- /**
- * Transforms the given DataStream to a [[org.apache.flink.api.expressions.ExpressionOperation]].
- * The fields of the DataSet type are renamed to the given set of fields:
- *
- * Example:
- *
- * {{{
- * ExpressionUtil.from(set, "a, b")
- * }}}
- *
- * This will transform the set containing elements of two fields to a table where the fields
- * are named a and b.
- */
- def from[T](set: DataStream[T], fields: String): ExpressionOperation[JavaStreamingTranslator] = {
- new JavaStreamingTranslator().createExpressionOperation(set, fields)
- }
-
- /**
- * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]].
- * The fields of the DataSet type are used to name the
- * [[org.apache.flink.api.expressions.ExpressionOperation]] fields.
- */
- def from[T](set: DataStream[T]): ExpressionOperation[JavaStreamingTranslator] = {
- new JavaStreamingTranslator().createExpressionOperation(set)
- }
-
- /**
- * Converts the given [[org.apache.flink.api.expressions.ExpressionOperation]] to
- * a DataSet. The given type must have exactly the same fields as the
- * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the names of the
- * fields and the types must match.
- */
- @SuppressWarnings(Array("unchecked"))
- def toSet[T](
- op: ExpressionOperation[JavaBatchTranslator],
- clazz: Class[T]): DataSet[T] = {
- op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataSet[T]]
- }
-
- /**
- * Converts the given [[org.apache.flink.api.expressions.ExpressionOperation]] to
- * a DataStream. The given type must have exactly the same fields as the
- * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the names of the
- * fields and the types must match.
- */
- @SuppressWarnings(Array("unchecked"))
- def toStream[T](
- op: ExpressionOperation[JavaStreamingTranslator], clazz: Class[T]): DataStream[T] = {
- op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataStream[T]]
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
deleted file mode 100644
index 567d19c..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions._
-import org.apache.flink.api.expressions.tree.{UnresolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeutils.CompositeType
-
-import org.apache.flink.api.scala._
-
-/**
- * Methods for converting a [[DataSet]] to an [[ExpressionOperation]]. A [[DataSet]] is
- * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.expressions]].
- */
-class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
-
- /**
- * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names of the resulting
- * expression operation can be specified like this:
- *
- * {{{
- * val in: DataSet[(String, Int)] = ...
- * val expr = in.as('a, 'b)
- * }}}
- *
- * This results in an expression operation that has field `a` of type `String` and field `b`
- * of type `Int`.
- */
- def as(fields: Expression*): ExpressionOperation[ScalaBatchTranslator] = {
- new ScalaBatchTranslator().createExpressionOperation(set, fields.toArray)
- }
-
- /**
- * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names of the resulting
- * expression operation will be taken from the field names of the input type:
- *
- * {{{
- * val in: DataSet[(String, Int)] = ...
- * val expr = in.toExpression
- * }}}
- *
- * This results in an expression operation that has field `_1` of type `String` and field `_2`
- * of type `Int`.
- */
- def toExpression: ExpressionOperation[ScalaBatchTranslator] = {
- val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
- as(resultFields: _*)
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
deleted file mode 100644
index 49dbce7..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions._
-import org.apache.flink.api.expressions.tree.{Expression, UnresolvedFieldReference}
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.DataStream
-
-class DataStreamConversions[T](set: DataStream[T], inputType: CompositeType[T]) {
-
- /**
- * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field names of the resulting
- * expression operation can be specified like this:
- *
- * {{{
- * val in: DataSet[(String, Int)] = ...
- * val expr = in.as('a, 'b)
- * }}}
- *
- * This results in an expression operation that has field `a` of type `String` and field `b`
- * of type `Int`.
- */
-
- def as(fields: Expression*): ExpressionOperation[ScalaStreamingTranslator] = {
- new ScalaStreamingTranslator().createExpressionOperation(set, fields.toArray)
- }
-
- /**
- * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field names of the resulting
- * expression operation will be taken from the field names of the input type:
- *
- * {{{
- * val in: DataSet[(String, Int)] = ...
- * val expr = in.toExpression
- * }}}
- *
- * This results in an expression operation that has field `_1` of type `String` and field `_2`
- * of type `Int`.
- */
-
- def toExpression: ExpressionOperation[ScalaStreamingTranslator] = {
- val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
- as(resultFields: _*)
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
deleted file mode 100644
index ae41ceb..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions.analysis.ExtractEquiJoinFields
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.parser.ExpressionParser
-import org.apache.flink.api.expressions.runtime.{ExpressionAggregateFunction, ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction}
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.expressions.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo}
-import org.apache.flink.api.expressions.{ExpressionException, ExpressionOperation, Row}
-import org.apache.flink.api.java.aggregation.AggregationFunction
-import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
-import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping}
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-
-/**
- * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java [[JavaDataSet]]s and
- * translating them back to Scala [[JavaDataSet]]s.
- */
-class JavaBatchTranslator extends OperationTranslator {
-
- type Representation[A] = JavaDataSet[A]
-
-
- def createExpressionOperation[A](
- repr: JavaDataSet[A]): ExpressionOperation[JavaBatchTranslator] = {
- val fields =
- repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference)
-
- createExpressionOperation(repr, fields.toArray.asInstanceOf[Array[Expression]], false)
- }
-
- def createExpressionOperation[A](
- repr: JavaDataSet[A],
- expression: String): ExpressionOperation[JavaBatchTranslator] = {
- val fields = ExpressionParser.parseExpressionList(expression)
-
- createExpressionOperation(repr, fields.toArray)
- }
-
- def createExpressionOperation[A](
- repr: JavaDataSet[A],
- fields: Array[Expression],
- checkDeterministicFields: Boolean = true): ExpressionOperation[JavaBatchTranslator] = {
-
- // shortcut for DataSet[Row]
- repr.getType match {
- case rowTypeInfo: RowTypeInfo =>
- val expressions = rowTypeInfo.getFieldNames map {
- name => (name, rowTypeInfo.getTypeAt(name))
- }
- new ExpressionOperation(
- Root(repr.asInstanceOf[JavaDataSet[Row]], expressions), this)
- case _ =>
- }
-
- val clazz = repr.getType.getTypeClass
- if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
- throw new ExpressionException("Cannot create expression Operation from DataSet of type " +
- clazz.getName + ". Only top-level classes or static members classes " +
- " are supported.")
- }
-
- if (!repr.getType.isInstanceOf[CompositeType[_]]) {
- throw new ExpressionException("Only DataSets of composite type can be transformed to an" +
- " Expression Operation. These would be tuples, case classes and POJOs.")
- }
-
- val inputType = repr.getType.asInstanceOf[CompositeType[A]]
-
- if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
- throw new ExpressionException(s"You cannot rename fields upon Table creaton: " +
- s"Field order of input type $inputType is not deterministic." )
- }
-
- if (fields.length != inputType.getFieldNames.length) {
- throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") +
- "' and number of fields in input type " + inputType + " do not match.")
- }
-
- val newFieldNames = fields map {
- case UnresolvedFieldReference(name) => name
- case e =>
- throw new ExpressionException("Only field references allowed in 'as' operation, " +
- " offending expression: " + e)
- }
-
- if (newFieldNames.toSet.size != newFieldNames.size) {
- throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}")
- }
-
- val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map {
- case (name, index) => (name, inputType.getTypeAt(index))
- }
-
- val inputFields = inputType.getFieldNames
- val fieldMappings = inputFields.zip(resultFields)
- val expressions: Array[Expression] = fieldMappings map {
- case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName)
- }
-
- val rowDataSet = createSelect(expressions, repr, inputType)
-
- new ExpressionOperation(Root(rowDataSet, resultFields), new JavaBatchTranslator)
- }
-
- override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {
-
- if (tpe.getTypeClass == classOf[Row]) {
- // shortcut for DataSet[Row]
- return translateInternal(op).asInstanceOf[JavaDataSet[A]]
- }
-
- val clazz = tpe.getTypeClass
- if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
- throw new ExpressionException("Cannot create DataSet of type " +
- clazz.getName + ". Only top-level classes or static member classes are supported.")
- }
-
-
- if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
- throw new ExpressionException(
- "Expression operations can only be converted to composite types, type is: " +
- implicitly[TypeInformation[A]] +
- ". Composite types would be tuples, case classes and POJOs.")
- }
-
- val resultSet = translateInternal(op)
-
- val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
-
- val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
-
- val resultNames = resultType.getFieldNames
- val outputNames = outputType.getFieldNames.toSeq
-
- if (resultNames.toSet != outputNames.toSet) {
- throw new ExpressionException(s"Expression result type $resultType does not have the same" +
- s"fields as output type $outputType")
- }
-
- for (f <- outputNames) {
- val in = resultType.getTypeAt(resultType.getFieldIndex(f))
- val out = outputType.getTypeAt(outputType.getFieldIndex(f))
- if (!in.equals(out)) {
- throw new ExpressionException(s"Types for field $f differ on input $resultType and " +
- s"output $outputType.")
- }
- }
-
- val outputFields = outputNames map {
- f => ResolvedFieldReference(f, resultType.getTypeAt(f))
- }
-
- val function = new ExpressionSelectFunction(
- resultSet.getType.asInstanceOf[RowTypeInfo],
- outputType,
- outputFields)
-
- val opName = s"select(${outputFields.mkString(",")})"
- val operator = new MapOperator(resultSet, outputType, function, opName)
-
- operator
- }
-
- private def translateInternal(op: Operation): JavaDataSet[Row] = {
- op match {
- case Root(dataSet: JavaDataSet[Row], resultFields) =>
- dataSet
-
- case Root(_, _) =>
- throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op)
-
- case GroupBy(_, fields) =>
- throw new ExpressionException("Dangling GroupBy operation. Did you forget a " +
- "SELECT statement?")
-
- case As(input, newNames) =>
- val translatedInput = translateInternal(input)
- val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
- val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray)
- new RenameOperator(translatedInput, proxyType)
-
- case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) =>
-
- val expandedInput = ExpandAggregations(sel)
-
- if (expandedInput.eq(sel)) {
- val translatedLeftInput = translateInternal(leftInput)
- val translatedRightInput = translateInternal(rightInput)
- val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
- val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
- createJoin(
- predicate,
- selection,
- translatedLeftInput,
- translatedRightInput,
- leftInType,
- rightInType,
- JoinHint.OPTIMIZER_CHOOSES)
- } else {
- translateInternal(expandedInput)
- }
-
- case Filter(Join(leftInput, rightInput), predicate) =>
- val translatedLeftInput = translateInternal(leftInput)
- val translatedRightInput = translateInternal(rightInput)
- val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
- val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
- createJoin(
- predicate,
- leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++
- rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)),
- translatedLeftInput,
- translatedRightInput,
- leftInType,
- rightInType,
- JoinHint.OPTIMIZER_CHOOSES)
-
- case Join(leftInput, rightInput) =>
- throw new ExpressionException("Join without filter condition encountered. " +
- "Did you forget to add .where(...) ?")
-
- case sel@Select(input, selection) =>
-
- val expandedInput = ExpandAggregations(sel)
-
- if (expandedInput.eq(sel)) {
- // no expansions took place
- val translatedInput = translateInternal(input)
- val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
- val inputFields = inType.getFieldNames
- createSelect(
- selection,
- translatedInput,
- inType)
- } else {
- translateInternal(expandedInput)
- }
-
- case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
- val translatedInput = translateInternal(input)
- val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-
- val keyIndices = groupExpressions map {
- case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name)
- case e => throw new ExpressionException(s"Expression $e is not a valid key expression.")
- }
-
- val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false)
-
- val grouping = new UnsortedGrouping(translatedInput, keys)
-
- val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
- case (fieldName, fun) =>
- fun.getFactory.createAggregationFunction[Any](
- inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
- }
-
- val aggIndices = aggregations map {
- case (fieldName, _) =>
- inType.getFieldIndex(fieldName)
- }
-
- val result = new GroupReduceOperator(
- grouping,
- inType,
- new ExpressionAggregateFunction(aggIndices, aggFunctions),
- "Expression Aggregation: " + agg)
-
- result
-
- case agg@Aggregate(input, aggregations) =>
- val translatedInput = translateInternal(input)
- val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-
- val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
- case (fieldName, fun) =>
- fun.getFactory.createAggregationFunction[Any](
- inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
- }
-
- val aggIndices = aggregations map {
- case (fieldName, _) =>
- inType.getFieldIndex(fieldName)
- }
-
- val result = new GroupReduceOperator(
- translatedInput,
- inType,
- new ExpressionAggregateFunction(aggIndices, aggFunctions),
- "Expression Aggregation: " + agg)
-
- result
-
-
- case Filter(input, predicate) =>
- val translatedInput = translateInternal(input)
- val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
- val filter = new ExpressionFilterFunction[Row](predicate, inType)
- translatedInput.filter(filter)
- }
- }
-
- private def createSelect[I](
- fields: Seq[Expression],
- input: JavaDataSet[I],
- inputType: CompositeType[I]): JavaDataSet[Row] = {
-
- fields foreach {
- f =>
- if (f.exists(_.isInstanceOf[Aggregation])) {
- throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".")
- }
-
- }
-
- val resultType = new RowTypeInfo(fields)
-
- val function = new ExpressionSelectFunction(inputType, resultType, fields)
-
- val opName = s"select(${fields.mkString(",")})"
- val operator = new MapOperator(input, resultType, function, opName)
-
- operator
- }
-
- private def createJoin[L, R](
- predicate: Expression,
- fields: Seq[Expression],
- leftInput: JavaDataSet[L],
- rightInput: JavaDataSet[R],
- leftType: CompositeType[L],
- rightType: CompositeType[R],
- joinHint: JoinHint): JavaDataSet[Row] = {
-
- val resultType = new RowTypeInfo(fields)
-
- val (reducedPredicate, leftFields, rightFields) =
- ExtractEquiJoinFields(leftType, rightType, predicate)
-
- if (leftFields.isEmpty || rightFields.isEmpty) {
- throw new ExpressionException("Could not derive equi-join predicates " +
- "for predicate " + predicate + ".")
- }
-
- val leftKey = new ExpressionKeys[L](leftFields, leftType)
- val rightKey = new ExpressionKeys[R](rightFields, rightType)
-
- val joiner = new ExpressionJoinFunction[L, R, Row](
- reducedPredicate,
- leftType,
- rightType,
- resultType,
- fields)
-
- new EquiJoin[L, R, Row](
- leftInput,
- rightInput,
- leftKey,
- rightKey,
- joiner,
- resultType,
- joinHint,
- predicate.toString)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
deleted file mode 100644
index 56c38af..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.parser.ExpressionParser
-import org.apache.flink.api.expressions.runtime.{ExpressionFilterFunction, ExpressionSelectFunction}
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.expressions.typeinfo.RowTypeInfo
-import org.apache.flink.api.expressions.{ExpressionException, ExpressionOperation, Row}
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable
-
-/**
- * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java [[DataStream]]s and
- * translating them back to Java [[DataStream]]s.
- *
- * This is very limited right now. Only select and filter are implemented. Also, the expression
- * operations must be extended to allow windowing operations.
- */
-
-class JavaStreamingTranslator extends OperationTranslator {
-
- type Representation[A] = DataStream[A]
-
-
- def createExpressionOperation[A](
- repr: DataStream[A]): ExpressionOperation[JavaStreamingTranslator] = {
- val fields =
- repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference)
-
- createExpressionOperation(repr, fields.toArray.asInstanceOf[Array[Expression]])
- }
-
- def createExpressionOperation[A](
- repr: DataStream[A],
- expression: String): ExpressionOperation[JavaStreamingTranslator] = {
- val fields = ExpressionParser.parseExpressionList(expression)
-
- createExpressionOperation(repr, fields.toArray)
- }
-
- def createExpressionOperation[A](
- repr: DataStream[A],
- fields: Array[Expression]): ExpressionOperation[JavaStreamingTranslator] = {
-
- // shortcut for DataSet[Row]
- repr.getType match {
- case rowTypeInfo: RowTypeInfo =>
- val expressions = rowTypeInfo.getFieldNames map {
- name => (name, rowTypeInfo.getTypeAt(name))
- }
- new ExpressionOperation(
- Root(repr.asInstanceOf[DataStream[Row]], expressions), this)
- case _ =>
- }
-
- val clazz = repr.getType.getTypeClass
- if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
- throw new ExpressionException("Cannot create expression Operation from DataSet of type " +
- clazz.getName + ". Only top-level classes or static members classes " +
- " are supported.")
- }
-
- if (!repr.getType.isInstanceOf[CompositeType[_]]) {
- throw new ExpressionException("Only DataSets of composite type can be transformed to an" +
- " Expression Operation. These would be tuples, case classes and POJOs.")
- }
-
- val inputType = repr.getType.asInstanceOf[CompositeType[A]]
-
- if (fields.length != inputType.getFieldNames.length) {
- throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") +
- "' and number of fields in input type " + inputType + " do not match.")
- }
-
- val newFieldNames = fields map {
- case UnresolvedFieldReference(name) => name
- case e =>
- throw new ExpressionException("Only field references allowed in 'as' operation, " +
- " offending expression: " + e)
- }
-
- if (newFieldNames.toSet.size != newFieldNames.size) {
- throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}")
- }
-
- val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map {
- case (name, index) => (name, inputType.getTypeAt(index))
- }
-
- val inputFields = inputType.getFieldNames
- val fieldMappings = inputFields.zip(resultFields)
- val expressions: Array[Expression] = fieldMappings map {
- case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName)
- }
-
- val rowDataSet = createSelect(expressions, repr, inputType)
-
- new ExpressionOperation(Root(rowDataSet, resultFields), new JavaStreamingTranslator)
- }
-
- override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-
- if (tpe.getTypeClass == classOf[Row]) {
- // shortcut for DataSet[Row]
- return translateInternal(op).asInstanceOf[DataStream[A]]
- }
-
- val clazz = tpe.getTypeClass
- if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
- throw new ExpressionException("Cannot create DataStream of type " +
- clazz.getName + ". Only top-level classes or static member classes are supported.")
- }
-
- if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
- throw new ExpressionException(
- "Expression operations can only be converted to composite types, type is: " +
- implicitly[TypeInformation[A]] +
- ". Composite types would be tuples, case classes and POJOs.")
-
- }
-
- val resultSet = translateInternal(op)
-
- val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
-
- val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
-
- val resultNames = resultType.getFieldNames
- val outputNames = outputType.getFieldNames.toSeq
-
- if (resultNames.toSet != outputNames.toSet) {
- throw new ExpressionException(s"Expression result type $resultType does not have the same" +
- s"fields as output type $outputType")
- }
-
- for (f <- outputNames) {
- val in = resultType.getTypeAt(resultType.getFieldIndex(f))
- val out = outputType.getTypeAt(outputType.getFieldIndex(f))
- if (!in.equals(out)) {
- throw new ExpressionException(s"Types for field $f differ on input $resultType and " +
- s"output $outputType.")
- }
- }
-
- val outputFields = outputNames map {
- f => ResolvedFieldReference(f, resultType.getTypeAt(f))
- }
-
- val function = new ExpressionSelectFunction(
- resultSet.getType.asInstanceOf[RowTypeInfo],
- outputType,
- outputFields)
-
- val opName = s"select(${outputFields.mkString(",")})"
-
- resultSet.transform(opName, outputType, new MapInvokable[Row, A](function))
- }
-
- private def translateInternal(op: Operation): DataStream[Row] = {
- op match {
- case Root(dataSet: DataStream[Row], resultFields) =>
- dataSet
-
- case Root(_, _) =>
- throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op)
-
- case GroupBy(_, fields) =>
- throw new ExpressionException("Dangling GroupBy operation. Did you forget a " +
- "SELECT statement?")
-
- case As(input, newNames) =>
- throw new ExpressionException("As operation for Streams not yet implemented.")
-
- case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) =>
-
- val expandedInput = ExpandAggregations(sel)
-
- if (expandedInput.eq(sel)) {
- val translatedLeftInput = translateInternal(leftInput)
- val translatedRightInput = translateInternal(rightInput)
- val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
- val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
- createJoin(
- predicate,
- selection,
- translatedLeftInput,
- translatedRightInput,
- leftInType,
- rightInType,
- JoinHint.OPTIMIZER_CHOOSES)
- } else {
- translateInternal(expandedInput)
- }
-
- case Filter(Join(leftInput, rightInput), predicate) =>
- val translatedLeftInput = translateInternal(leftInput)
- val translatedRightInput = translateInternal(rightInput)
- val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
- val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
- createJoin(
- predicate,
- leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++
- rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)),
- translatedLeftInput,
- translatedRightInput,
- leftInType,
- rightInType,
- JoinHint.OPTIMIZER_CHOOSES)
-
- case Join(leftInput, rightInput) =>
- throw new ExpressionException("Join without filter condition encountered. " +
- "Did you forget to add .where(...) ?")
-
- case sel@Select(input, selection) =>
-
- val expandedInput = ExpandAggregations(sel)
-
- if (expandedInput.eq(sel)) {
- // no expansions took place
- val translatedInput = translateInternal(input)
- val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
- val inputFields = inType.getFieldNames
- createSelect(
- selection,
- translatedInput,
- inType)
- } else {
- translateInternal(expandedInput)
- }
-
- case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
- throw new ExpressionException("Aggregate operation for Streams not yet implemented.")
-
- case agg@Aggregate(input, aggregations) =>
- throw new ExpressionException("Aggregate operation for Streams not yet implemented.")
-
- case Filter(input, predicate) =>
- val translatedInput = translateInternal(input)
- val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
- val filter = new ExpressionFilterFunction[Row](predicate, inType)
- translatedInput.filter(filter)
- }
- }
-
- private def createSelect[I](
- fields: Seq[Expression],
- input: DataStream[I],
- inputType: CompositeType[I]): DataStream[Row] = {
-
- fields foreach {
- f =>
- if (f.exists(_.isInstanceOf[Aggregation])) {
- throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".")
- }
-
- }
-
- val resultType = new RowTypeInfo(fields)
-
- val function = new ExpressionSelectFunction(inputType, resultType, fields)
-
- val opName = s"select(${fields.mkString(",")})"
-
- input.transform(opName, resultType, new MapInvokable[I, Row](function))
- }
-
- private def createJoin[L, R](
- predicate: Expression,
- fields: Seq[Expression],
- leftInput: DataStream[L],
- rightInput: DataStream[R],
- leftType: CompositeType[L],
- rightType: CompositeType[R],
- joinHint: JoinHint): DataStream[Row] = {
-
- throw new ExpressionException("Join operation for Streams not yet implemented.")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
deleted file mode 100644
index 724c8a7..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.scala.wrap
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.ExpressionOperation
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-
-import scala.reflect.ClassTag
-
-
-/**
- * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala [[DataSet]]s and
- * translating them back to Scala [[DataSet]]s.
- */
-class ScalaBatchTranslator extends OperationTranslator {
-
- private val javaTranslator = new JavaBatchTranslator
-
- override type Representation[A] = DataSet[A]
-
- def createExpressionOperation[A](
- repr: DataSet[A],
- fields: Array[Expression]): ExpressionOperation[ScalaBatchTranslator] = {
-
- val result = javaTranslator.createExpressionOperation(repr.javaSet, fields)
-
- new ExpressionOperation[ScalaBatchTranslator](result.operation, this)
- }
-
- override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataSet[O] = {
- // fake it till you make it ...
- wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
deleted file mode 100644
index 7db483f..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.expressions.{ExpressionOperation, Row}
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.streaming.api.scala.DataStream
-
-import org.apache.flink.streaming.api.scala.javaToScalaStream
-
-/**
- * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala [[DataStream]]s and
- * translating them back to Scala [[DataStream]]s.
- *
- * This is very limited right now. Only select and filter are implemented. Also, the expression
- * operations must be extended to allow windowing operations.
- */
-class ScalaStreamingTranslator extends OperationTranslator {
-
- private val javaTranslator = new JavaStreamingTranslator
-
- override type Representation[A] = DataStream[A]
-
- def createExpressionOperation[A](
- repr: DataStream[A],
- fields: Array[Expression]): ExpressionOperation[ScalaStreamingTranslator] = {
-
- val result = javaTranslator.createExpressionOperation(repr.getJavaStream, fields)
-
- new ExpressionOperation[ScalaStreamingTranslator](result.operation, this)
- }
-
- override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataStream[O] = {
- // fake it till you make it ...
- javaToScalaStream(javaTranslator.translate(op))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
deleted file mode 100644
index 1f6c397..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.language.implicitConversions
-
-/**
- * These are all the operations that can be used to construct an [[Expression]] AST for expression
- * operations.
- *
- * These operations must be kept in sync with the parser in
- * [[org.apache.flink.api.expressions.parser.ExpressionParser]].
- */
-trait ImplicitExpressionOperations {
- def expr: Expression
-
- def && (other: Expression) = And(expr, other)
- def || (other: Expression) = Or(expr, other)
-
- def > (other: Expression) = GreaterThan(expr, other)
- def >= (other: Expression) = GreaterThanOrEqual(expr, other)
- def < (other: Expression) = LessThan(expr, other)
- def <= (other: Expression) = LessThanOrEqual(expr, other)
-
- def === (other: Expression) = EqualTo(expr, other)
- def !== (other: Expression) = NotEqualTo(expr, other)
-
- def unary_! = Not(expr)
- def unary_- = UnaryMinus(expr)
-
- def isNull = IsNull(expr)
- def isNotNull = IsNotNull(expr)
-
- def + (other: Expression) = Plus(expr, other)
- def - (other: Expression) = Minus(expr, other)
- def / (other: Expression) = Div(expr, other)
- def * (other: Expression) = Mul(expr, other)
- def % (other: Expression) = Mod(expr, other)
-
- def & (other: Expression) = BitwiseAnd(expr, other)
- def | (other: Expression) = BitwiseOr(expr, other)
- def ^ (other: Expression) = BitwiseXor(expr, other)
- def unary_~ = BitwiseNot(expr)
-
- def abs = Abs(expr)
-
- def sum = Sum(expr)
- def min = Min(expr)
- def max = Max(expr)
- def count = Count(expr)
- def avg = Avg(expr)
-
- def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = {
- Substring(expr, beginIndex, endIndex)
- }
-
- def cast(toType: TypeInformation[_]) = Cast(expr, toType)
-
- def as(name: Symbol) = Naming(expr, name.name)
-}
-
-/**
- * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]]
- * to [[ImplicitExpressionOperations]].
- */
-trait ImplicitExpressionConversions {
- implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
- def expr = e
- }
-
- implicit class SymbolExpression(s: Symbol) extends ImplicitExpressionOperations {
- def expr = UnresolvedFieldReference(s.name)
- }
-
- implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations {
- def expr = Literal(l)
- }
-
- implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations {
- def expr = Literal(i)
- }
-
- implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations {
- def expr = Literal(f)
- }
-
- implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations {
- def expr = Literal(d)
- }
-
- implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations {
- def expr = Literal(str)
- }
-
- implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations {
- def expr = Literal(bool)
- }
-
- implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name)
- implicit def int2Literal(i: Int): Expression = Literal(i)
- implicit def long2Literal(l: Long): Expression = Literal(l)
- implicit def double2Literal(d: Double): Expression = Literal(d)
- implicit def float2Literal(d: Float): Expression = Literal(d)
- implicit def string2Literal(str: String): Expression = Literal(str)
- implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
-}