You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/03 10:34:56 UTC
[2/4] flink git commit: [FLINK-1788] [table] Make logical plans
transformable
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
index b0e2d05..ba376f5 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
@@ -18,7 +18,7 @@
package org.apache.flink.api.table.runtime
import org.apache.flink.api.table.codegen.GenerateUnaryPredicate
-import org.apache.flink.api.table.tree.{NopExpression, Expression}
+import org.apache.flink.api.table.expressions.{NopExpression, Expression}
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.configuration.Configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
index f0f5636..f5616d3 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.api.table.runtime
-import org.apache.flink.api.table.tree.{NopExpression, Expression}
+import org.apache.flink.api.table.expressions.{NopExpression, Expression}
import org.apache.flink.api.common.functions.RichFlatJoinFunction
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.codegen.{GenerateBinaryResultAssembler,
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
index 0a2830b..16e256a 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.api.table.runtime
-import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.codegen.GenerateUnaryResultAssembler
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
deleted file mode 100644
index 6302572..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
-
-import scala.language.postfixOps
-
-
-abstract class Expression extends Product {
- def children: Seq[Expression]
- def name: String = Expression.freshName("expression")
- def typeInfo: TypeInformation[_]
-
- /**
- * Tests for equality by first testing for reference equality.
- */
- def fastEquals(other: Expression): Boolean = this.eq(other) || this == other
-
- def transformPre(rule: PartialFunction[Expression, Expression]): Expression = {
- val afterTransform = rule.applyOrElse(this, identity[Expression])
-
- if (afterTransform fastEquals this) {
- this.transformChildrenPre(rule)
- } else {
- afterTransform.transformChildrenPre(rule)
- }
- }
-
- def transformChildrenPre(rule: PartialFunction[Expression, Expression]): Expression = {
- var changed = false
- val newArgs = productIterator map {
- case child: Expression if children.contains(child) =>
- val newChild = child.transformPre(rule)
- if (newChild fastEquals child) {
- child
- } else {
- changed = true
- newChild
- }
- case other: AnyRef => other
- case null => null
- } toArray
-
- if (changed) makeCopy(newArgs) else this
- }
-
- def transformPost(rule: PartialFunction[Expression, Expression]): Expression = {
- val afterChildren = transformChildrenPost(rule)
- if (afterChildren fastEquals this) {
- rule.applyOrElse(this, identity[Expression])
- } else {
- rule.applyOrElse(afterChildren, identity[Expression])
- }
- }
-
- def transformChildrenPost(rule: PartialFunction[Expression, Expression]): Expression = {
- var changed = false
- val newArgs = productIterator map {
- case child: Expression if children.contains(child) =>
- val newChild = child.transformPost(rule)
- if (newChild fastEquals child) {
- child
- } else {
- changed = true
- newChild
- }
- case other: AnyRef => other
- case null => null
- } toArray
- // toArray forces evaluation, toSeq does not seem to work here
-
- if (changed) makeCopy(newArgs) else this
- }
-
- def exists(predicate: Expression => Boolean): Boolean = {
- var exists = false
- this.transformPre {
- case e: Expression => if (predicate(e)) {
- exists = true
- }
- e
- }
- exists
- }
-
- /**
- * Creates a new copy of this expression with new children. This is used during transformation
- * if children change. This must be overridden by Expressions that don't have the Constructor
- * arguments in the same order as the `children`.
- */
- def makeCopy(newArgs: Seq[AnyRef]): this.type = {
- val defaultCtor =
- this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
- try {
- defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
- } catch {
- case iae: IllegalArgumentException =>
- println("IAE " + this)
- throw new RuntimeException("Should never happen.")
- }
- }
-}
-
-abstract class BinaryExpression() extends Expression {
- def left: Expression
- def right: Expression
- def children = Seq(left, right)
-}
-
-abstract class UnaryExpression() extends Expression {
- def child: Expression
- def children = Seq(child)
-}
-
-abstract class LeafExpression() extends Expression {
- val children = Nil
-}
-
-case class NopExpression() extends LeafExpression {
- val typeInfo = new NothingTypeInfo()
- override val name = Expression.freshName("nop")
-
-}
-
-object Expression {
- def freshName(prefix: String): String = {
- s"$prefix-${freshNameCounter.getAndIncrement}"
- }
-
- val freshNameCounter = new AtomicInteger
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
deleted file mode 100644
index e5cdac5..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.java.aggregation.Aggregations
-
-
-abstract sealed class Aggregation extends UnaryExpression {
- def typeInfo = {
- child.typeInfo match {
- case BasicTypeInfo.LONG_TYPE_INFO => // ok
- case BasicTypeInfo.INT_TYPE_INFO =>
- case BasicTypeInfo.DOUBLE_TYPE_INFO =>
- case BasicTypeInfo.FLOAT_TYPE_INFO =>
- case BasicTypeInfo.BYTE_TYPE_INFO =>
- case BasicTypeInfo.SHORT_TYPE_INFO =>
- case _ =>
- throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " +
- s"aggregation $this. Only numeric data types supported.")
- }
- child.typeInfo
- }
-
- override def toString = s"Aggregate($child)"
-
- def getIntermediateFields: Seq[Expression]
- def getFinalField(inputs: Seq[Expression]): Expression
- def getAggregations: Seq[Aggregations]
-}
-
-case class Sum(child: Expression) extends Aggregation {
- override def toString = s"($child).sum"
-
- override def getIntermediateFields: Seq[Expression] = Seq(child)
- override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
- override def getAggregations = Seq(Aggregations.SUM)
-}
-
-case class Min(child: Expression) extends Aggregation {
- override def toString = s"($child).min"
-
- override def getIntermediateFields: Seq[Expression] = Seq(child)
- override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
- override def getAggregations = Seq(Aggregations.MIN)
-
-}
-
-case class Max(child: Expression) extends Aggregation {
- override def toString = s"($child).max"
-
- override def getIntermediateFields: Seq[Expression] = Seq(child)
- override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
- override def getAggregations = Seq(Aggregations.MAX)
-}
-
-case class Count(child: Expression) extends Aggregation {
- override def typeInfo = {
- child.typeInfo match {
- case _ => // we can count anything... :D
- }
- BasicTypeInfo.INT_TYPE_INFO
- }
-
- override def toString = s"($child).count"
-
- override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1)))
- override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
- override def getAggregations = Seq(Aggregations.SUM)
-
-}
-
-case class Avg(child: Expression) extends Aggregation {
- override def toString = s"($child).avg"
-
- override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
- // This is just sweet. Use our own AST representation and let the code generator do
- // our dirty work.
- override def getFinalField(inputs: Seq[Expression]): Expression =
- Div(inputs(0), inputs(1))
- override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
deleted file mode 100644
index 84f9b18..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation}
-
-abstract class BinaryArithmetic extends BinaryExpression {
- def typeInfo = {
- if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
- }
- if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""")
- }
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- left.typeInfo
- }
-}
-
-case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
- override def typeInfo = {
- if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
- !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
- throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this")
- }
- if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
- !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
- throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this")
- }
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- left.typeInfo
- }
-
- override def toString = s"($left + $right)"
-}
-
-case class UnaryMinus(child: Expression) extends UnaryExpression {
- def typeInfo = {
- if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
- }
- child.typeInfo
- }
-
- override def toString = s"-($child)"
-}
-
-case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left - $right)"
-}
-
-case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left / $right)"
-}
-
-case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left * $right)"
-}
-
-case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left * $right)"
-}
-
-case class Abs(child: Expression) extends UnaryExpression {
- def typeInfo = child.typeInfo
-
- override def toString = s"abs($child)"
-}
-
-abstract class BitwiseBinaryArithmetic extends BinaryExpression {
- def typeInfo: TypeInformation[_] = {
- if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
- }
- if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""")
- }
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
- left.typeInfo
- } else {
- BasicTypeInfo.INT_TYPE_INFO
- }
- }
-}
-
-case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
- override def toString = s"($left & $right)"
-}
-
-case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
- override def toString = s"($left | $right)"
-}
-
-
-case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
- override def toString = s"($left ^ $right)"
-}
-
-case class BitwiseNot(child: Expression) extends UnaryExpression {
- def typeInfo: TypeInformation[_] = {
- if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
- }
- if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
- child.typeInfo
- } else {
- BasicTypeInfo.INT_TYPE_INFO
- }
- }
-
- override def toString = s"~($child)"
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
deleted file mode 100644
index a3acc35..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
- def typeInfo = tpe
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
deleted file mode 100644
index e0a34a9..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
-
-abstract class BinaryComparison extends BinaryExpression {
- def typeInfo = {
- if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(s"Non-numeric operand ${left} in $this")
- }
- if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
- throw new ExpressionException(s"Non-numeric operand ${right} in $this")
- }
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-}
-
-case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
- override def typeInfo = {
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override def toString = s"$left === $right"
-}
-
-case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
- override def typeInfo = {
- if (left.typeInfo != right.typeInfo) {
- throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override def toString = s"$left !== $right"
-}
-
-case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left > $right"
-}
-
-case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left >= $right"
-}
-
-case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left < $right"
-}
-
-case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left <= $right"
-}
-
-case class IsNull(child: Expression) extends UnaryExpression {
- def typeInfo = {
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override def toString = s"($child).isNull"
-}
-
-case class IsNotNull(child: Expression) extends UnaryExpression {
- def typeInfo = {
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override def toString = s"($child).isNotNull"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
deleted file mode 100644
index cc42148..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class UnresolvedFieldReference(override val name: String) extends LeafExpression {
- def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this")
-
- override def toString = "\"" + name
-}
-
-case class ResolvedFieldReference(
- override val name: String,
- tpe: TypeInformation[_]) extends LeafExpression {
- def typeInfo = tpe
-
- override def toString = s"'$name"
-}
-
-case class Naming(child: Expression, override val name: String) extends UnaryExpression {
- def typeInfo = child.typeInfo
-
- override def toString = s"$child as '$name"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
deleted file mode 100644
index 852d5a1..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala.table.ImplicitExpressionOperations
-
-object Literal {
- def apply(l: Any): Literal = l match {
- case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
- case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
- case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
- case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
- case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
- case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
- }
-}
-
-case class Literal(value: Any, tpe: TypeInformation[_])
- extends LeafExpression with ImplicitExpressionOperations {
- def expr = this
- def typeInfo = tpe
-
- override def toString = s"$value"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
deleted file mode 100644
index 8ab838d..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-abstract class BinaryPredicate extends BinaryExpression {
- def typeInfo = {
- if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
- right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " +
- s"${right.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-}
-
-case class Not(child: Expression) extends UnaryExpression {
- def typeInfo = {
- if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this")
- }
- BasicTypeInfo.BOOLEAN_TYPE_INFO
- }
-
- override val name = Expression.freshName("not-" + child.name)
-
- override def toString = s"!($child)"
-}
-
-case class And(left: Expression, right: Expression) extends BinaryPredicate {
- override def toString = s"$left && $right"
-
- override val name = Expression.freshName(left.name + "-and-" + right.name)
-}
-
-case class Or(left: Expression, right: Expression) extends BinaryPredicate {
- override def toString = s"$left || $right"
-
- override val name = Expression.freshName(left.name + "-or-" + right.name)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
deleted file mode 100644
index caac402..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * This package contains the base class of AST nodes and all the expression language AST classes.
- * Expression trees should not be manually constructed by users. They are implicitly constructed
- * from the implicit DSL conversions in
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API,
- * expression trees should be generated from a string parser that parses expressions and creates
- * AST nodes.
- */
-package object tree
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
deleted file mode 100644
index e14374f..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.tree
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
-
-case class Substring(
- str: Expression,
- beginIndex: Expression,
- endIndex: Expression) extends Expression {
- def typeInfo = {
- if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
- throw new ExpressionException(
- s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
- }
- if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""")
- }
- if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- throw new ExpressionException(
- s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""")
- }
-
- BasicTypeInfo.STRING_TYPE_INFO
- }
-
- override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
- override def toString = s"($str).substring($beginIndex, $endIndex)"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
new file mode 100644
index 0000000..87051cf
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Base class for tree analyzers/transformers. Analyzers must implement method `rules` to
+ * provide the chain of rules that are invoked one after another. The tree resulting
+ * from one rule is fed into the next rule and the final result is returned from method `analyze`.
+ */
+abstract class Analyzer[A <: TreeNode[A]] {
+
+ def rules: Seq[Rule[A]]
+
+ final def analyze(expr: A): A = {
+ var currentTree = expr
+ for (rule <- rules) {
+ var running = true
+ while (running) {
+ val newTree = rule(currentTree)
+ if (newTree fastEquals currentTree) {
+ running = false
+ }
+ currentTree = newTree
+ }
+ }
+ currentTree
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
new file mode 100644
index 0000000..b8a27cb
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets a tree
+ * and must return a tree. The returned tree can also be the input tree. In an [[Analyzer]]
+ * rule chain the result tree of one [[Rule]] is fed into the next [[Rule]] in the chain.
+ *
+ * A [[Rule]] is repeatedly applied to a tree until the tree does not change between
+ * rule applications.
+ */
+abstract class Rule[A <: TreeNode[A]] {
+ def apply(expr: A): A
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
new file mode 100644
index 0000000..84f1d7e
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Generic base class for trees that can be transformed and traversed.
+ */
+abstract class TreeNode[A <: TreeNode[A]] { self: A with Product =>
+
+ /**
+ * List of child nodes that should be considered when doing transformations. Other values
+ * in the Product will not be transformed, only handed through.
+ */
+ def children: Seq[A]
+
+ /**
+ * Tests for equality by first testing for reference equality.
+ */
+ def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
+
+ def transformPre(rule: PartialFunction[A, A]): A = {
+ val afterTransform = rule.applyOrElse(this, identity[A])
+
+ if (afterTransform fastEquals this) {
+ this.transformChildrenPre(rule)
+ } else {
+ afterTransform.transformChildrenPre(rule)
+ }
+ }
+
+ def transformChildrenPre(rule: PartialFunction[A, A]): A = {
+ var changed = false
+ val newArgs = productIterator map {
+ case child: A if children.contains(child) =>
+ val newChild = child.transformPre(rule)
+ if (newChild fastEquals child) {
+ child
+ } else {
+ changed = true
+ newChild
+ }
+ case other: AnyRef => other
+ case null => null
+ } toArray
+
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ def transformPost(rule: PartialFunction[A, A]): A = {
+ val afterChildren = transformChildrenPost(rule)
+ if (afterChildren fastEquals this) {
+ rule.applyOrElse(this, identity[A])
+ } else {
+ rule.applyOrElse(afterChildren, identity[A])
+ }
+ }
+
+ def transformChildrenPost(rule: PartialFunction[A, A]): A = {
+ var changed = false
+ val newArgs = productIterator map {
+ case child: A if children.contains(child) =>
+ val newChild = child.transformPost(rule)
+ if (newChild fastEquals child) {
+ child
+ } else {
+ changed = true
+ newChild
+ }
+ case other: AnyRef => other
+ case null => null
+ } toArray
+ // toArray forces evaluation, toSeq does not seem to work here
+
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ def exists(predicate: A => Boolean): Boolean = {
+ var exists = false
+ this.transformPre {
+ case e: A => if (predicate(e)) {
+ exists = true
+ }
+ e
+ }
+ exists
+ }
+
+ /**
+ * Creates a new copy of this expression with new children. This is used during transformation
+ * if children change. This must be overridden by tree nodes that don't have the Constructor
+ * arguments in the same order as the `children`.
+ */
+ def makeCopy(newArgs: Seq[AnyRef]): this.type = {
+ val defaultCtor =
+ this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
+ try {
+ defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
+ } catch {
+ case iae: IllegalArgumentException =>
+ println("IAE " + this)
+ throw new RuntimeException("Should never happen.")
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
index 7ffa91c..db3c881 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.table.Row
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
deleted file mode 100644
index 604bdcf..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.table.tree.Literal
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.examples.java.graph.util.PageRankData
-import org.apache.flink.util.Collector
-
-import _root_.scala.collection.JavaConverters._
-
-/**
-* A basic implementation of the Page Rank algorithm using a bulk iteration.
-*
-* This implementation requires a set of pages and a set of directed links as input and works as
-* follows.
-*
-* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each
-* page collects the partial ranks of all pages that point to it, sums them up, and applies a
-* dampening factor to the sum. The result is the new rank of the page. A new iteration is started
-* with the new ranks of all pages. This implementation terminates after a fixed number of
-* iterations. This is the Wikipedia entry for the
-* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
-*
-* Input files are plain text files and must be formatted as follows:
-*
-* - Pages represented as an (long) ID separated by new-line characters.
-* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
-* - Links are represented as pairs of page IDs which are separated by space characters. Links
-* are separated by new-line characters.
-* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12),
-* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has
-* at least one incoming and one outgoing link (a page can point to itself).
-*
-* Usage:
-* {{{
-* PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>
-* }}}
-*
-* If no parameters are provided, the program is run with default data from
-* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
-*
-* This example shows how to use:
-*
-* - Bulk Iterations
-* - Table API expressions
-*/
-object PageRankExpression {
-
- private final val DAMPENING_FACTOR: Double = 0.85
- private final val EPSILON: Double = 0.0001
-
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- // set up execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // read input data
- val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
- .as('pageId, 'rank)
-
- val links = getLinksDataSet(env)
-
- // build adjacency list from link input
- val adjacencyLists = links
- .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
-
- override def reduce(
- values: _root_.java.lang.Iterable[Link],
- out: Collector[AdjacencyList]): Unit = {
- var outputId = -1L
- val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
- out.collect(new AdjacencyList(outputId, outputList.toArray))
- }
-
- }).as('sourceId, 'targetIds)
-
- // start iteration
- val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
- currentRanks =>
- val newRanks = currentRanks.toTable
- // distribute ranks to target pages
- .join(adjacencyLists).where('pageId === 'sourceId)
- .select('rank, 'targetIds).as[RankOutput]
- .flatMap {
- (in, out: Collector[(Long, Double)]) =>
- val targets = in.targetIds
- val len = targets.length
- targets foreach { t => out.collect((t, in.rank / len )) }
- }
- .as('pageId, 'rank)
- // collect ranks and sum them up
- .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
- // apply dampening factor
- .select(
- 'pageId,
- ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank)
-
-
- val termination = currentRanks.toTable
- .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
- .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
-
- (newRanks, termination)
- }
-
- val result = finalRanks
-
- // emit result
- if (fileOutput) {
- result.writeAsCsv(outputPath, "\n", " ")
- } else {
- result.print()
- }
-
- // execute program
- env.execute("Expression PageRank Example")
- }
-
- // *************************************************************************
- // USER TYPES
- // *************************************************************************
-
- case class Link(sourceId: Long, targetId: Long)
-
- case class Page(pageId: Long, rank: Double)
-
- case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
-
- case class RankOutput(rank: Double, targetIds: Array[Long])
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private def parseParameters(args: Array[String]): Boolean = {
- if (args.length > 0) {
- fileOutput = true
- if (args.length == 5) {
- pagesInputPath = args(0)
- linksInputPath = args(1)
- outputPath = args(2)
- numPages = args(3).toLong
- maxIterations = args(4).toInt
- } else {
- System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " +
- "pages> <num iterations>")
- false
- }
- } else {
- System.out.println("Executing PageRank Basic example with default parameters and built-in " +
- "default data.")
- System.out.println(" Provide parameters to read input data from files.")
- System.out.println(" See the documentation for the correct format of input files.")
- System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " +
- "pages> <num iterations>")
-
- numPages = PageRankData.getNumberOfPages
- }
- true
- }
-
- private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
- if (fileOutput) {
- env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n")
- .map(x => x._1)
- } else {
- env.generateSequence(1, 15)
- }
- }
-
- private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
- if (fileOutput) {
- env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
- includedFields = Array(0, 1))
- } else {
- val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
- v2.asInstanceOf[Long])}
- env.fromCollection(edges)
- }
- }
-
- private var fileOutput: Boolean = false
- private var pagesInputPath: String = null
- private var linksInputPath: String = null
- private var outputPath: String = null
- private var numPages: Double = 0
- private var maxIterations: Int = 10
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
new file mode 100644
index 0000000..c0e0f6c
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.examples.java.graph.util.PageRankData
+import org.apache.flink.util.Collector
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+* A basic implementation of the Page Rank algorithm using a bulk iteration.
+*
+* This implementation requires a set of pages and a set of directed links as input and works as
+* follows.
+*
+* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each
+* page collects the partial ranks of all pages that point to it, sums them up, and applies a
+* dampening factor to the sum. The result is the new rank of the page. A new iteration is started
+* with the new ranks of all pages. This implementation terminates after a fixed number of
+* iterations. This is the Wikipedia entry for the
+* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
+*
+* Input files are plain text files and must be formatted as follows:
+*
+* - Pages represented as an (long) ID separated by new-line characters.
+* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
+* - Links are represented as pairs of page IDs which are separated by space characters. Links
+* are separated by new-line characters.
+* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12),
+* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has
+* at least one incoming and one outgoing link (a page can point to itself).
+*
+* Usage:
+* {{{
+* PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>
+* }}}
+*
+* If no parameters are provided, the program is run with default data from
+* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
+*
+* This example shows how to use:
+*
+* - Bulk Iterations
+* - Table API expressions
+*/
+object PageRankTable {
+
+ private final val DAMPENING_FACTOR: Double = 0.85
+ private final val EPSILON: Double = 0.0001
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ // set up execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // read input data
+ val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
+ .as('pageId, 'rank)
+
+ val links = getLinksDataSet(env)
+
+ // build adjacency list from link input
+ val adjacencyLists = links
+ .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
+
+ override def reduce(
+ values: _root_.java.lang.Iterable[Link],
+ out: Collector[AdjacencyList]): Unit = {
+ var outputId = -1L
+ val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
+ out.collect(new AdjacencyList(outputId, outputList.toArray))
+ }
+
+ }).as('sourceId, 'targetIds)
+
+ // start iteration
+ val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+ currentRanks =>
+ val newRanks = currentRanks.toTable
+ // distribute ranks to target pages
+ .join(adjacencyLists).where('pageId === 'sourceId)
+ .select('rank, 'targetIds).toSet[RankOutput]
+ .flatMap {
+ (in, out: Collector[(Long, Double)]) =>
+ val targets = in.targetIds
+ val len = targets.length
+ targets foreach { t => out.collect((t, in.rank / len )) }
+ }
+ .as('pageId, 'rank)
+ // collect ranks and sum them up
+ .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
+ // apply dampening factor
+ .select(
+ 'pageId,
+ ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank)
+
+
+ val termination = currentRanks.toTable
+ .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
+ .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
+
+ (newRanks, termination)
+ }
+
+ val result = finalRanks
+
+ // emit result
+ if (fileOutput) {
+ result.writeAsCsv(outputPath, "\n", " ")
+ } else {
+ result.print()
+ }
+
+ // execute program
+ env.execute("Expression PageRank Example")
+ }
+
+ // *************************************************************************
+ // USER TYPES
+ // *************************************************************************
+
+ case class Link(sourceId: Long, targetId: Long)
+
+ case class Page(pageId: Long, rank: Double)
+
+ case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
+ case class RankOutput(rank: Double, targetIds: Array[Long])
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length > 0) {
+ fileOutput = true
+ if (args.length == 5) {
+ pagesInputPath = args(0)
+ linksInputPath = args(1)
+ outputPath = args(2)
+ numPages = args(3).toLong
+ maxIterations = args(4).toInt
+ } else {
+ System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " +
+ "pages> <num iterations>")
+ false
+ }
+ } else {
+ System.out.println("Executing PageRank Basic example with default parameters and built-in " +
+ "default data.")
+ System.out.println(" Provide parameters to read input data from files.")
+ System.out.println(" See the documentation for the correct format of input files.")
+ System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " +
+ "pages> <num iterations>")
+
+ numPages = PageRankData.getNumberOfPages
+ }
+ true
+ }
+
+ private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+ if (fileOutput) {
+ env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n")
+ .map(x => x._1)
+ } else {
+ env.generateSequence(1, 15)
+ }
+ }
+
+ private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
+ if (fileOutput) {
+ env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
+ includedFields = Array(0, 1))
+ } else {
+ val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
+ v2.asInstanceOf[Long])}
+ env.fromCollection(edges)
+ }
+ }
+
+ private var fileOutput: Boolean = false
+ private var pagesInputPath: String = null
+ private var linksInputPath: String = null
+ private var outputPath: String = null
+ private var numPages: Double = 0
+ private var maxIterations: Int = 10
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
deleted file mode 100644
index 0ff97bf..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala
-
-import org.apache.flink.streaming.api.scala._
-
-import org.apache.flink.api.scala.table._
-
-import scala.Stream._
-import scala.math._
-import scala.language.postfixOps
-import scala.util.Random
-
-/**
- * Simple example for demonstrating the use of the Table API with Flink Streaming.
- */
-object StreamingExpressionFilter {
-
- case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable
-
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- val cars = genCarStream().toTable
- .filter('carId === 0)
- .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
- .as[CarEvent]
-
- cars.print()
-
- StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
-
- }
-
- def genCarStream(): DataStream[CarEvent] = {
-
- def nextSpeed(carEvent : CarEvent) : CarEvent =
- {
- val next =
- if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5)
- CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
- }
- def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
- {
- Thread.sleep(1000)
- speeds.append(carStream(speeds.map(nextSpeed)))
- }
- carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
- }
-
- def parseParameters(args: Array[String]): Boolean = {
- if (args.length > 0) {
- if (args.length == 3) {
- numOfCars = args(0).toInt
- evictionSec = args(1).toInt
- triggerMeters = args(2).toDouble
- true
- }
- else {
- System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
- false
- }
- }else{
- true
- }
- }
-
- var numOfCars = 2
- var evictionSec = 10
- var triggerMeters = 50d
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
new file mode 100644
index 0000000..4aa5653
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.streaming.api.scala._
+
+import org.apache.flink.api.scala.table._
+
+import scala.Stream._
+import scala.math._
+import scala.language.postfixOps
+import scala.util.Random
+
+/**
+ * Simple example for demonstrating the use of the Table API with Flink Streaming.
+ */
+object StreamingTableFilter {
+
+ case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val cars = genCarStream().toTable
+ .filter('carId === 0)
+ .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
+ .toStream[CarEvent]
+
+ cars.print()
+
+ StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
+
+ }
+
+ def genCarStream(): DataStream[CarEvent] = {
+
+ def nextSpeed(carEvent : CarEvent) : CarEvent =
+ {
+ val next =
+ if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5)
+ CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
+ }
+ def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
+ {
+ Thread.sleep(1000)
+ speeds.append(carStream(speeds.map(nextSpeed)))
+ }
+ carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
+ }
+
+ def parseParameters(args: Array[String]): Boolean = {
+ if (args.length > 0) {
+ if (args.length == 3) {
+ numOfCars = args(0).toInt
+ evictionSec = args(1).toInt
+ triggerMeters = args(2).toDouble
+ true
+ }
+ else {
+ System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
+ false
+ }
+ }else{
+ true
+ }
+ }
+
+ var numOfCars = 2
+ var evictionSec = 10
+ var triggerMeters = 50d
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
deleted file mode 100644
index 96ec4ba..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.table.tree.Literal
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-
-/**
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple class.
- * The original query can be found at
- * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
- * (page 29).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- * SELECT
- * l_orderkey,
- * SUM(l_extendedprice*(1-l_discount)) AS revenue,
- * o_orderdate,
- * o_shippriority
- * FROM customer,
- * orders,
- * lineitem
- * WHERE
- * c_mktsegment = '[SEGMENT]'
- * AND c_custkey = o_custkey
- * AND l_orderkey = o_orderkey
- * AND o_orderdate < date '[DATE]'
- * AND l_shipdate > date '[DATE]'
- * GROUP BY
- * l_orderkey,
- * o_orderdate,
- * o_shippriority;
- * }}}
- *
- * Compared to the original TPC-H query this version does not sort the result by revenue
- * and orderdate.
- *
- * Input files are plain text CSV files using the pipe character ('|') as field separator
- * as generated by the TPC-H data generator which is available at
- * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
- *
- * Usage:
- * {{{
- * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
- * }}}
- *
- * This example shows how to use:
- * - Table API expressions
- *
- */
-object TPCHQuery3Expression {
-
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- // set filter date
- val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
- val date = dateFormat.parse("1995-03-12")
-
- // get execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val lineitems = getLineitemDataSet(env)
- .filter( l => dateFormat.parse(l.shipDate).after(date) )
- .as('id, 'extdPrice, 'discount, 'shipDate)
-
- val customers = getCustomerDataSet(env)
- .as('id, 'mktSegment)
- .filter( 'mktSegment === "AUTOMOBILE" )
-
- val orders = getOrdersDataSet(env)
- .filter( o => dateFormat.parse(o.orderDate).before(date) )
- .as('orderId, 'custId, 'orderDate, 'shipPrio)
-
- val items =
- orders.join(customers)
- .where('custId === 'id)
- .select('orderId, 'orderDate, 'shipPrio)
- .join(lineitems)
- .where('orderId === 'id)
- .select(
- 'orderId,
- 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
- 'orderDate,
- 'shipPrio)
-
- val result = items
- .groupBy('orderId, 'orderDate, 'shipPrio)
- .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
-
- // emit result
- result.writeAsCsv(outputPath, "\n", "|")
-
- // execute program
- env.execute("Scala TPCH Query 3 (Expression) Example")
- }
-
- // *************************************************************************
- // USER DATA TYPES
- // *************************************************************************
-
- case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
- case class Customer(id: Long, mktSegment: String)
- case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private var lineitemPath: String = null
- private var customerPath: String = null
- private var ordersPath: String = null
- private var outputPath: String = null
-
- private def parseParameters(args: Array[String]): Boolean = {
- if (args.length == 4) {
- lineitemPath = args(0)
- customerPath = args(1)
- ordersPath = args(2)
- outputPath = args(3)
- true
- } else {
- System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
- " Due to legal restrictions, we can not ship generated data.\n" +
- " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
- " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" +
- "<orders-csv path> <result path>");
- false
- }
- }
-
- private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
- env.readCsvFile[Lineitem](
- lineitemPath,
- fieldDelimiter = "|",
- includedFields = Array(0, 5, 6, 10) )
- }
-
- private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
- env.readCsvFile[Customer](
- customerPath,
- fieldDelimiter = "|",
- includedFields = Array(0, 6) )
- }
-
- private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
- env.readCsvFile[Order](
- ordersPath,
- fieldDelimiter = "|",
- includedFields = Array(0, 1, 4, 7) )
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
new file mode 100644
index 0000000..f527a3c
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple class.
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT
+ * l_orderkey,
+ * SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ * o_orderdate,
+ * o_shippriority
+ * FROM customer,
+ * orders,
+ * lineitem
+ * WHERE
+ * c_mktsegment = '[SEGMENT]'
+ * AND c_custkey = o_custkey
+ * AND l_orderkey = o_orderkey
+ * AND o_orderdate < date '[DATE]'
+ * AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ * l_orderkey,
+ * o_orderdate,
+ * o_shippriority;
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not sort the result by revenue
+ * and orderdate.
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator
+ * as generated by the TPC-H data generator which is available at
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage:
+ * {{{
+ * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * }}}
+ *
+ * This example shows how to use:
+ * - Table API expressions
+ *
+ */
+object TPCHQuery3Table {
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ // set filter date
+ val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
+ val date = dateFormat.parse("1995-03-12")
+
+ // get execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val lineitems = getLineitemDataSet(env)
+ .filter( l => dateFormat.parse(l.shipDate).after(date) )
+ .as('id, 'extdPrice, 'discount, 'shipDate)
+
+ val customers = getCustomerDataSet(env)
+ .as('id, 'mktSegment)
+ .filter( 'mktSegment === "AUTOMOBILE" )
+
+ val orders = getOrdersDataSet(env)
+ .filter( o => dateFormat.parse(o.orderDate).before(date) )
+ .as('orderId, 'custId, 'orderDate, 'shipPrio)
+
+ val items =
+ orders.join(customers)
+ .where('custId === 'id)
+ .select('orderId, 'orderDate, 'shipPrio)
+ .join(lineitems)
+ .where('orderId === 'id)
+ .select(
+ 'orderId,
+ 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
+ 'orderDate,
+ 'shipPrio)
+
+ val result = items
+ .groupBy('orderId, 'orderDate, 'shipPrio)
+ .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
+
+ // emit result
+ result.writeAsCsv(outputPath, "\n", "|")
+
+ // execute program
+ env.execute("Scala TPCH Query 3 (Expression) Example")
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
+ case class Customer(id: Long, mktSegment: String)
+ case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private var lineitemPath: String = null
+ private var customerPath: String = null
+ private var ordersPath: String = null
+ private var outputPath: String = null
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length == 4) {
+ lineitemPath = args(0)
+ customerPath = args(1)
+ ordersPath = args(2)
+ outputPath = args(3)
+ true
+ } else {
+ System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+ " Due to legal restrictions, we can not ship generated data.\n" +
+ " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+ " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" +
+ "<orders-csv path> <result path>");
+ false
+ }
+ }
+
+ private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+ env.readCsvFile[Lineitem](
+ lineitemPath,
+ fieldDelimiter = "|",
+ includedFields = Array(0, 5, 6, 10) )
+ }
+
+ private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+ env.readCsvFile[Customer](
+ customerPath,
+ fieldDelimiter = "|",
+ includedFields = Array(0, 6) )
+ }
+
+ private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+ env.readCsvFile[Order](
+ ordersPath,
+ fieldDelimiter = "|",
+ includedFields = Array(0, 1, 4, 7) )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 0b2a5df..60fb984 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -85,10 +85,9 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- Table<JavaBatchTranslator> table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env));
+ Table table = tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env));
- Table<JavaBatchTranslator> result =
- table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
+ Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
DataSet<Row> ds = tableEnv.toSet(result, Row.class);
ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
@@ -103,10 +102,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env));
- Table<JavaBatchTranslator> result =
+ Table result =
table.select("foo.avg");
DataSet<Row> ds = tableEnv.toSet(result, Row.class);
@@ -127,10 +126,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(input);
- Table<JavaBatchTranslator> result =
+ Table result =
table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
DataSet<Row> ds = tableEnv.toSet(result, Row.class);
@@ -151,10 +150,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
new Tuple2<Float, String>(1f, "Hello"),
new Tuple2<Float, String>(2f, "Ciao"));
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(input);
- Table<JavaBatchTranslator> result =
+ Table result =
table.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\"");
@@ -174,10 +173,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f,
"Hello"));
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(input);
- Table<JavaBatchTranslator> result =
+ Table result =
table.select("f1.sum");
@@ -196,10 +195,10 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, "Hello"));
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(input);
- Table<JavaBatchTranslator> result =
+ Table result =
table.select("f0.sum.sum");
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index ee877e9..6ec3187 100644
--- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -65,7 +65,7 @@ public class AsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -86,7 +86,7 @@ public class AsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b");
DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -102,7 +102,7 @@ public class AsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -118,7 +118,7 @@ public class AsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -134,7 +134,7 @@ public class AsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
DataSet<Row> ds = tableEnv.toSet(table, Row.class);
@@ -150,7 +150,7 @@ public class AsITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
- Table<JavaBatchTranslator> table =
+ Table table =
tableEnv.toTable(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," +
" c");