You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/29 13:50:38 UTC

[09/12] flink git commit: [FLINK-1623] Rename Expression API to Table API

[FLINK-1623] Rename Expression API to Table API

Package name is now flink-table. ExpressionOperation is renamed to
Table.

This also adds more JavaDoc and ScalDoc.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9519c8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9519c8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9519c8d

Branch: refs/heads/master
Commit: c9519c8d6c869d2bfab186e449f0ad2b62484805
Parents: d7d9b63
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 18 14:44:42 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Mar 29 12:27:53 2015 +0200

----------------------------------------------------------------------
 docs/linq.md                                    |  90 ++-
 .../org/apache/flink/api/scala/DataSet.scala    |   2 +-
 flink-staging/flink-expressions/pom.xml         | 246 -------
 .../api/java/expressions/package-info.java      |  22 -
 .../examples/java/JavaExpressionExample.java    |  69 --
 .../api/expressions/ExpressionException.scala   |  23 -
 .../api/expressions/ExpressionOperation.scala   | 245 -------
 .../org/apache/flink/api/expressions/Row.scala  |  38 --
 .../api/expressions/analysis/Analyzer.scala     |  38 --
 .../analysis/ExtractEquiJoinFields.scala        |  70 --
 .../expressions/analysis/GroupByAnalyzer.scala  |  48 --
 .../expressions/analysis/InsertAutoCasts.scala  |  91 ---
 .../analysis/PredicateAnalyzer.scala            |  32 -
 .../analysis/ResolveFieldReferences.scala       |  57 --
 .../flink/api/expressions/analysis/Rule.scala   |  30 -
 .../analysis/SelectionAnalyzer.scala            |  33 -
 .../api/expressions/analysis/TypeCheck.scala    |  56 --
 .../expressions/analysis/VerifyBoolean.scala    |  40 --
 .../analysis/VerifyNoAggregates.scala           |  51 --
 .../analysis/VerifyNoNestedAggregates.scala     |  52 --
 .../codegen/ExpressionCodeGenerator.scala       | 635 -------------------
 .../codegen/GenerateBinaryPredicate.scala       |  73 ---
 .../codegen/GenerateBinaryResultAssembler.scala |  60 --
 .../codegen/GenerateResultAssembler.scala       |  99 ---
 .../codegen/GenerateUnaryPredicate.scala        |  67 --
 .../codegen/GenerateUnaryResultAssembler.scala  |  57 --
 .../flink/api/expressions/codegen/package.scala |  25 -
 .../operations/ExpandAggregations.scala         | 144 -----
 .../operations/OperationTranslator.scala        |  35 -
 .../api/expressions/operations/operations.scala | 101 ---
 .../api/expressions/operations/package.scala    |  24 -
 .../expressions/parser/ExpressionParser.scala   | 209 ------
 .../runtime/ExpressionAggregateFunction.scala   |  72 ---
 .../runtime/ExpressionFilterFunction.scala      |  47 --
 .../runtime/ExpressionJoinFunction.scala        |  76 ---
 .../runtime/ExpressionSelectFunction.scala      |  51 --
 .../flink/api/expressions/runtime/package.scala |  23 -
 .../flink/api/expressions/tree/Expression.scala | 149 -----
 .../api/expressions/tree/aggregations.scala     |  99 ---
 .../flink/api/expressions/tree/arithmetic.scala | 145 -----
 .../flink/api/expressions/tree/cast.scala       |  24 -
 .../flink/api/expressions/tree/comparison.scala |  93 ---
 .../api/expressions/tree/fieldExpression.scala  |  41 --
 .../flink/api/expressions/tree/literals.scala   |  40 --
 .../flink/api/expressions/tree/logic.scala      |  58 --
 .../flink/api/expressions/tree/package.scala    |  29 -
 .../expressions/tree/stringExpressions.scala    |  46 --
 .../expressions/typeinfo/RenameOperator.scala   |  36 --
 .../typeinfo/RenamingProxyTypeInfo.scala        | 109 ----
 .../expressions/typeinfo/RowSerializer.scala    | 121 ----
 .../api/expressions/typeinfo/RowTypeInfo.scala  |  51 --
 .../api/java/expressions/ExpressionUtil.scala   | 112 ----
 .../scala/expressions/DataSetConversions.scala  |  66 --
 .../expressions/DataStreamConversions.scala     |  65 --
 .../scala/expressions/JavaBatchTranslator.scala | 392 ------------
 .../expressions/JavaStreamingTranslator.scala   | 303 ---------
 .../expressions/ScalaBatchTranslator.scala      |  55 --
 .../expressions/ScalaStreamingTranslator.scala  |  56 --
 .../api/scala/expressions/expressionDsl.scala   | 124 ----
 .../flink/api/scala/expressions/package.scala   | 102 ---
 .../examples/scala/PageRankExpression.scala     | 210 ------
 .../scala/StreamingExpressionFilter.scala       |  90 ---
 .../examples/scala/TPCHQuery3Expression.scala   | 174 -----
 .../expressions/test/AggregationsITCase.java    | 210 ------
 .../api/java/expressions/test/AsITCase.java     | 158 -----
 .../java/expressions/test/CastingITCase.java    | 130 ----
 .../expressions/test/ExpressionsITCase.java     | 192 ------
 .../api/java/expressions/test/FilterITCase.java | 130 ----
 .../test/GroupedAggregationsITCase.java         | 126 ----
 .../api/java/expressions/test/JoinITCase.java   | 202 ------
 .../api/java/expressions/test/SelectITCase.java | 169 -----
 .../test/StringExpressionsITCase.java           | 144 -----
 .../test/PageRankExpressionITCase.java          | 100 ---
 .../expressions/test/AggregationsITCase.scala   | 127 ----
 .../api/scala/expressions/test/AsITCase.scala   | 124 ----
 .../scala/expressions/test/CastingITCase.scala  |  92 ---
 .../expressions/test/ExpressionsITCase.scala    | 127 ----
 .../scala/expressions/test/FilterITCase.scala   | 151 -----
 .../test/GroupedAggreagationsITCase.scala       |  96 ---
 .../api/scala/expressions/test/JoinITCase.scala | 145 -----
 .../scala/expressions/test/SelectITCase.scala   | 143 -----
 .../test/StringExpressionsITCase.scala          |  98 ---
 .../flink/streaming/api/scala/DataStream.scala  |   5 +
 flink-staging/flink-table/pom.xml               | 246 +++++++
 .../flink/api/java/table/package-info.java      |  60 ++
 .../apache/flink/api/table/package-info.java    |  33 +
 .../flink/examples/java/JavaTableExample.java   |  71 +++
 .../api/java/table/JavaBatchTranslator.scala    | 319 ++++++++++
 .../java/table/JavaStreamingTranslator.scala    | 236 +++++++
 .../flink/api/java/table/TableEnvironment.scala | 112 ++++
 .../api/scala/table/DataSetConversions.scala    |  67 ++
 .../api/scala/table/DataStreamConversions.scala |  68 ++
 .../api/scala/table/ScalaBatchTranslator.scala  |  68 ++
 .../scala/table/ScalaStreamingTranslator.scala  |  58 ++
 .../flink/api/scala/table/expressionDsl.scala   | 124 ++++
 .../apache/flink/api/scala/table/package.scala  | 101 +++
 .../flink/api/table/ExpressionException.scala   |  23 +
 .../scala/org/apache/flink/api/table/Row.scala  |  38 ++
 .../org/apache/flink/api/table/Table.scala      | 243 +++++++
 .../flink/api/table/analysis/Analyzer.scala     |  38 ++
 .../table/analysis/ExtractEquiJoinFields.scala  |  70 ++
 .../api/table/analysis/GroupByAnalyzer.scala    |  48 ++
 .../api/table/analysis/InsertAutoCasts.scala    |  91 +++
 .../api/table/analysis/PredicateAnalyzer.scala  |  32 +
 .../table/analysis/ResolveFieldReferences.scala |  57 ++
 .../apache/flink/api/table/analysis/Rule.scala  |  30 +
 .../api/table/analysis/SelectionAnalyzer.scala  |  33 +
 .../flink/api/table/analysis/TypeCheck.scala    |  56 ++
 .../api/table/analysis/VerifyBoolean.scala      |  40 ++
 .../api/table/analysis/VerifyNoAggregates.scala |  51 ++
 .../analysis/VerifyNoNestedAggregates.scala     |  52 ++
 .../table/codegen/ExpressionCodeGenerator.scala | 635 +++++++++++++++++++
 .../table/codegen/GenerateBinaryPredicate.scala |  73 +++
 .../codegen/GenerateBinaryResultAssembler.scala |  60 ++
 .../table/codegen/GenerateResultAssembler.scala |  99 +++
 .../table/codegen/GenerateUnaryPredicate.scala  |  67 ++
 .../codegen/GenerateUnaryResultAssembler.scala  |  57 ++
 .../flink/api/table/codegen/package.scala       |  25 +
 .../table/operations/ExpandAggregations.scala   | 144 +++++
 .../api/table/operations/TableTranslator.scala  | 158 +++++
 .../flink/api/table/operations/operations.scala | 101 +++
 .../flink/api/table/operations/package.scala    |  24 +
 .../org/apache/flink/api/table/package.scala    |  34 +
 .../api/table/parser/ExpressionParser.scala     | 209 ++++++
 .../runtime/ExpressionAggregateFunction.scala   |  72 +++
 .../runtime/ExpressionFilterFunction.scala      |  47 ++
 .../table/runtime/ExpressionJoinFunction.scala  |  76 +++
 .../runtime/ExpressionSelectFunction.scala      |  51 ++
 .../flink/api/table/runtime/package.scala       |  23 +
 .../flink/api/table/tree/Expression.scala       | 149 +++++
 .../flink/api/table/tree/aggregations.scala     |  99 +++
 .../flink/api/table/tree/arithmetic.scala       | 145 +++++
 .../org/apache/flink/api/table/tree/cast.scala  |  24 +
 .../flink/api/table/tree/comparison.scala       |  93 +++
 .../flink/api/table/tree/fieldExpression.scala  |  41 ++
 .../apache/flink/api/table/tree/literals.scala  |  40 ++
 .../org/apache/flink/api/table/tree/logic.scala |  58 ++
 .../apache/flink/api/table/tree/package.scala   |  29 +
 .../api/table/tree/stringExpressions.scala      |  46 ++
 .../api/table/typeinfo/RenameOperator.scala     |  36 ++
 .../table/typeinfo/RenamingProxyTypeInfo.scala  | 109 ++++
 .../api/table/typeinfo/RowSerializer.scala      | 121 ++++
 .../flink/api/table/typeinfo/RowTypeInfo.scala  |  51 ++
 .../examples/scala/PageRankExpression.scala     | 210 ++++++
 .../scala/StreamingExpressionFilter.scala       |  90 +++
 .../examples/scala/TPCHQuery3Expression.scala   | 174 +++++
 .../api/java/table/test/AggregationsITCase.java | 215 +++++++
 .../flink/api/java/table/test/AsITCase.java     | 165 +++++
 .../api/java/table/test/CastingITCase.java      | 133 ++++
 .../api/java/table/test/ExpressionsITCase.java  | 197 ++++++
 .../flink/api/java/table/test/FilterITCase.java | 133 ++++
 .../table/test/GroupedAggregationsITCase.java   | 131 ++++
 .../flink/api/java/table/test/JoinITCase.java   | 216 +++++++
 .../flink/api/java/table/test/SelectITCase.java | 180 ++++++
 .../table/test/StringExpressionsITCase.java     | 150 +++++
 .../table/test/PageRankExpressionITCase.java    | 100 +++
 .../scala/table/test/AggregationsITCase.scala   | 127 ++++
 .../flink/api/scala/table/test/AsITCase.scala   | 124 ++++
 .../api/scala/table/test/CastingITCase.scala    |  92 +++
 .../scala/table/test/ExpressionsITCase.scala    | 127 ++++
 .../api/scala/table/test/FilterITCase.scala     | 151 +++++
 .../table/test/GroupedAggreagationsITCase.scala |  96 +++
 .../flink/api/scala/table/test/JoinITCase.scala | 145 +++++
 .../api/scala/table/test/SelectITCase.scala     | 143 +++++
 .../table/test/StringExpressionsITCase.scala    |  98 +++
 flink-staging/pom.xml                           |   2 +-
 166 files changed, 8727 insertions(+), 8523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/docs/linq.md
----------------------------------------------------------------------
diff --git a/docs/linq.md b/docs/linq.md
index ebb0063..79fe6f2 100644
--- a/docs/linq.md
+++ b/docs/linq.md
@@ -1,5 +1,5 @@
 ---
-title: "Language-Integrated Queries"
+title: "Language-Integrated Queries (Table API)"
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -23,58 +23,92 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-**Language-Integrated Queries are an experimental feature and can currently only be used with
-the Scala API**
+**Language-Integrated Queries are an experimental feature**
 
-Flink provides an API that allows specifying operations using SQL-like expressions.
-This Expression API can be enabled by importing
-`org.apache.flink.api.scala.expressions._`.  This enables implicit conversions that allow
-converting a `DataSet` or `DataStream` to an `ExpressionOperation` on which relational queries
-can be specified. This example shows how a `DataSet` can be converted, how expression operations
-can be specified and how an expression operation can be converted back to a `DataSet`:
+Flink provides an API that allows specifying operations using SQL-like expressions. Instead of 
+manipulating `DataSet` or `DataStream` you work with `Table` on which relational operations can
+be performed. 
+
+The following dependency must be added to your project when using the Table API:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table</artifactId>
+  <version>{{site.FLINK_VERSION_SHORT }}</version>
+</dependency>
+{% endhighlight %}
+
+## Scala Table API
+ 
+The Table API can be enabled by importing `org.apache.flink.api.scala.table._`.  This enables
+implicit conversions that allow
+converting a DataSet or DataStream to a Table. This example shows how a DataSet can
+be converted, how relational queries can be specified and how a Table can be
+converted back to a DataSet:
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._ 
+import org.apache.flink.api.scala.table._ 
 
 case class WC(word: String, count: Int)
 val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-val expr = input.toExpression
-val result = expr.groupBy('word).select('word, 'count.sum).as[WC]
+val expr = input.toTable
+val result = expr.groupBy('word).select('word, 'count.sum).toSet[WC]
 {% endhighlight %}
 
 The expression DSL uses Scala symbols to refer to field names and we use code generation to
-transform expressions to efficient runtime code. Please not that the conversion to and from
-expression operations only works when using Scala case classes or Flink POJOs. Please check out
+transform expressions to efficient runtime code. Please note that the conversion to and from
+Tables only works when using Scala case classes or Flink POJOs. Please check out
 the [programming guide](programming_guide.html) to learn the requirements for a class to be 
 considered a POJO.
  
 This is another example that shows how you
-can join to operations:
+can join to Tables:
 
 {% highlight scala %}
 case class MyResult(a: String, b: Int)
 
 val input1 = env.fromElements(...).as('a, 'b)
 val input2 = env.fromElements(...).as('c, 'd)
-val joined = input1.join(input2).where('b == 'a && 'd > 42).select('a, 'd).as[MyResult]
+val joined = input1.join(input2).where("b = a && d > 42").select("a, d").as[MyResult]
 {% endhighlight %}
 
-Notice, how a `DataSet` can be converted to an expression operation by using `as` and specifying new
-names for the fields. This can also be used to disambiguate fields before a join operation.
+Notice, how a DataSet can be converted to a Table by using `as` and specifying new
+names for the fields. This can also be used to disambiguate fields before a join operation. Also,
+in this example we see that you can also use Strings to specify relational expressions.
 
-The Expression API can be used with the Streaming API, since we also have implicit conversions to
-and from `DataStream`.
+Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a
+description of the expression syntax. 
 
-The following dependency must be added to your project when using the Expression API:
+## Java Table API
 
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-expressions</artifactId>
-  <version>{{site.FLINK_VERSION_SHORT }}</version>
-</dependency>
+When using Java, Tables can be converted to and from DataSet and DataStream using `TableEnvironment`.
+This example is equivalent to the above Scala Example:
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+TableEnvironment tableEnv = new TableEnvironment();
+
+DataSet<WC> input = env.fromElements(
+        new WC("Hello", 1),
+        new WC("Ciao", 1),
+        new WC("Hello", 1));
+
+Table table = tableEnv.toTable(input);
+
+Table filtered = table
+        .groupBy("word")
+        .select("word.count as count, word")
+        .filter("count = 2");
+
+DataSet<WC> result = tableEnv.toSet(filtered, WC.class);
 {% endhighlight %}
 
-Please refer to the scaladoc for a full list of supported operations and a description of the
+When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
+are supported. They support exactly the same feature set as the expression DSL.
+
+Please refer to the Javadoc for a full list of supported operations and a description of the
 expression syntax. 
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 2732112..de07a57 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -90,7 +90,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   /**
    * Returns the TypeInformation for the elements of this DataSet.
    */
-  def getType: TypeInformation[T] = set.getType
+  def getType(): TypeInformation[T] = set.getType
 
   /**
    * Returns the execution environment associated with the current DataSet.

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/pom.xml b/flink-staging/flink-expressions/pom.xml
deleted file mode 100644
index f26ab03..0000000
--- a/flink-staging/flink-expressions/pom.xml
+++ /dev/null
@@ -1,246 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-staging</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-expressions</artifactId>
-	<name>flink-expressions</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala-examples</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-reflect</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-library</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-compiler</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-
-					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-						 scala classes can be resolved later in the (Java) test-compile phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-					<compilerPlugins combine.children="append">
-						<compilerPlugin>
-							<groupId>org.scalamacros</groupId>
-							<artifactId>paradise_${scala.version}</artifactId>
-							<version>${scala.macros.version}</version>
-						</compilerPlugin>
-					</compilerPlugins>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<version>0.5.0</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>check</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<verbose>false</verbose>
-					<failOnViolation>true</failOnViolation>
-					<includeTestSourceDirectory>true</includeTestSourceDirectory>
-					<failOnWarning>false</failOnWarning>
-					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-					<outputEncoding>UTF-8</outputEncoding>
-				</configuration>
-			</plugin>
-
-		</plugins>
-	</build>
-
-	<profiles>
-		<profile>
-			<id>scala-2.10</id>
-			<activation>
-				<property>
-					<!-- this is the default scala profile -->
-					<name>!scala-2.11</name>
-				</property>
-			</activation>
-			<dependencies>
-				<dependency>
-					<groupId>org.scalamacros</groupId>
-					<artifactId>quasiquotes_${scala.binary.version}</artifactId>
-					<version>${scala.macros.version}</version>
-				</dependency>
-			</dependencies>
-		</profile>
-	</profiles>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
deleted file mode 100644
index 07e18b2..0000000
--- a/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package doc wohoooo
- */
-package org.apache.flink.api.java.expressions;

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
deleted file mode 100644
index 42632f9..0000000
--- a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.java;
-
-
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-
-/**
- * Very simple example that shows how the Java Expression API can be used.
- */
-public class JavaExpressionExample {
-
-	public static class WC {
-		public String word;
-		public int count;
-
-		public WC() {
-
-		}
-
-		public WC(String word, int count) {
-			this.word = word;
-			this.count = count;
-		}
-
-		@Override
-		public String toString() {
-			return "WC " + word + " " + count;
-		}
-	}
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-
-		DataSet<WC> input = env.fromElements(
-				new WC("Hello", 1),
-				new WC("Ciao", 1),
-				new WC("Hello", 1));
-
-		ExpressionOperation expr = ExpressionUtil.from(input);
-
-		ExpressionOperation filtered = expr
-				.groupBy("word")
-				.select("word.count as count, word")
-				.filter("count = 2");
-
-		DataSet<WC> result = ExpressionUtil.toSet(filtered, WC.class);
-
-		result.print();
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
deleted file mode 100644
index 34e400f..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-/**
- * Exception for all errors occurring during expression operations.
- */
-class ExpressionException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
deleted file mode 100644
index 38417b2..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.expressions.analysis.{GroupByAnalyzer, SelectionAnalyzer,
-PredicateAnalyzer}
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.parser.ExpressionParser
-import org.apache.flink.api.expressions.tree.{ResolvedFieldReference,
-UnresolvedFieldReference, Expression}
-
-/**
- * The abstraction for writing expression API programs. Similar to how the batch and streaming APIs
- * have [[org.apache.flink.api.scala.DataSet]] and
- * [[org.apache.flink.streaming.api.scala.DataStream]].
- *
- * Use the methods of [[ExpressionOperation]] to transform data or to revert back to the underlying
- * batch or streaming representation.
- */
-case class ExpressionOperation[A <: OperationTranslator](
-    private[flink] val operation: Operation,
-    private[flink] val operationTranslator: A) {
-
-
-  /**
-   * Converts the result of this operation back to a [[org.apache.flink.api.scala.DataSet]] or
-   * [[org.apache.flink.streaming.api.scala.DataStream]].
-   */
-  def as[O](implicit tpe: TypeInformation[O]): operationTranslator.Representation[O] = {
-    operationTranslator.translate(operation)
-  }
-
-  /**
-   * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
-   * can contain complex expressions and aggregations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10))
-   * }}}
-   */
-  def select(fields: Expression*): ExpressionOperation[A] = {
-    val analyzer = new SelectionAnalyzer(operation.outputFields)
-    val analyzedFields = fields.map(analyzer.analyze)
-    val fieldNames = analyzedFields map(_.name)
-    if (fieldNames.toSet.size != fieldNames.size) {
-      throw new ExpressionException(s"Resulting fields names are not unique in expression" +
-        s""" "${fields.mkString(", ")}".""")
-    }
-    this.copy(operation = Select(operation, analyzedFields))
-  }
-
-  /**
-   * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
-   * can contain complex expressions and aggregations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.select("key, value.avg + " The average" as average, other.substring(0, 10)")
-   * }}}
-   */
-  def select(fields: String): ExpressionOperation[A] = {
-    val fieldExprs = ExpressionParser.parseExpressionList(fields)
-    select(fieldExprs: _*)
-  }
-
-  /**
-   * Renames the fields of the expression result. Use this to disambiguate fields before
-   * joining to operations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.as('a, 'b)
-   * }}}
-   */
-  def as(fields: Expression*): ExpressionOperation[A] = {
-    fields forall {
-      f => f.isInstanceOf[UnresolvedFieldReference]
-    } match {
-      case true =>
-      case false => throw new ExpressionException("Only field expression allowed in as().")
-    }
-    this.copy(operation = As(operation, fields.toArray map { _.name }))
-  }
-
-  /**
-   * Renames the fields of the expression result. Use this to disambiguate fields before
-   * joining to operations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.as("a, b")
-   * }}}
-   */
-  def as(fields: String): ExpressionOperation[A] = {
-    val fieldExprs = ExpressionParser.parseExpressionList(fields)
-    as(fieldExprs: _*)
-  }
-
-  /**
-   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.filter('name === "Fred")
-   * }}}
-   */
-  def filter(predicate: Expression): ExpressionOperation[A] = {
-    val analyzer = new PredicateAnalyzer(operation.outputFields)
-    val analyzedPredicate = analyzer.analyze(predicate)
-    this.copy(operation = Filter(operation, analyzedPredicate))
-  }
-
-  /**
-   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.filter("name === 'Fred'")
-   * }}}
-   */
-  def filter(predicate: String): ExpressionOperation[A] = {
-    val predicateExpr = ExpressionParser.parseExpression(predicate)
-    filter(predicateExpr)
-  }
-
-  /**
-   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.filter(name === "Fred")
-   * }}}
-   */
-  def where(predicate: Expression): ExpressionOperation[A] = {
-    filter(predicate)
-  }
-
-  /**
-   * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.filter("name === 'Fred'")
-   * }}}
-   */
-  def where(predicate: String): ExpressionOperation[A] = {
-    filter(predicate)
-  }
-
-  /**
-   * Groups the elements on some grouping keys. Use this before a selection with aggregations
-   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.groupBy('key).select('key, 'value.avg)
-   * }}}
-   */
-  def groupBy(fields: Expression*): ExpressionOperation[A] = {
-    val analyzer = new GroupByAnalyzer(operation.outputFields)
-    val analyzedFields = fields.map(analyzer.analyze)
-
-    val illegalKeys = analyzedFields filter {
-      case fe: ResolvedFieldReference => false // OK
-      case e => true
-    }
-
-    if (illegalKeys.nonEmpty) {
-      throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", "))
-    }
-
-    this.copy(operation = GroupBy(operation, analyzedFields))
-  }
-
-  /**
-   * Groups the elements on some grouping keys. Use this before a selection with aggregations
-   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.groupBy("key").select("key, value.avg")
-   * }}}
-   */
-  def groupBy(fields: String): ExpressionOperation[A] = {
-    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
-    groupBy(fieldsExpr: _*)
-  }
-
-  /**
-   * Joins to expression operations. Similar to an SQL join. The fields of the two joined
-   * operations must not overlap, use [[as]] to rename fields if necessary. You can use
-   * where and select clauses after a join to further specify the behaviour of the join.
-   *
-   * Example:
-   *
-   * {{{
-   *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
-   * }}}
-   */
-  def join(right: ExpressionOperation[A]): ExpressionOperation[A] = {
-    val leftInputNames = operation.outputFields.map(_._1).toSet
-    val rightInputNames = right.operation.outputFields.map(_._1).toSet
-    if (leftInputNames.intersect(rightInputNames).nonEmpty) {
-      throw new ExpressionException(
-        "Overlapping fields names on join input, result would be ambiguous: " +
-          operation.outputFields.mkString(", ") +
-          " and " +
-          right.operation.outputFields.mkString(", ") )
-    }
-    this.copy(operation = Join(operation, right.operation))
-  }
-
-  override def toString: String = s"Expression($operation)"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
deleted file mode 100644
index 47ef59e..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-/**
- * This is used for executing expression operations. We use manually generated
- * TypeInfo to check the field types and create serializers and comparators.
- */
-class Row(arity: Int) extends Product {
-
-  private val fields = new Array[Any](arity)
-
-  def productArity = fields.length
-
-  def productElement(i: Int): Any = fields(i)
-
-  def setField(i: Int, value: Any): Unit = fields(i) = value
-
-  def canEqual(that: Any) = false
-
-  override def toString = fields.mkString(",")
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
deleted file mode 100644
index da71cdd..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.Expression
-
-/**
- * Base class for expression analyzers/transformers. Analyzers must implement method `rules` to
- * provide the chain of rules that are invoked one after another. The expression resulting
- * from one rule is fed into the next rule and the final result is returned from method `analyze`.
- */
-abstract class Analyzer {
-
-  def rules: Seq[Rule]
-
-  final def analyze(expr: Expression): Expression = {
-    var currentTree = expr
-    for (rule <- rules) {
-      currentTree = rule(currentTree)
-    }
-    currentTree
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
deleted file mode 100644
index a4f8f25..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.common.typeutils.CompositeType
-
-import scala.collection.mutable
-
-/**
- * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified
- * expression without the equi-join predicates together with indices of the join fields
- * from both the left and right input.
- */
-object ExtractEquiJoinFields {
-  def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = {
-
-    val joinFieldsLeft = mutable.MutableList[Int]()
-    val joinFieldsRight = mutable.MutableList[Int]()
-
-    val equiJoinExprs = mutable.MutableList[EqualTo]()
-    // First get all `===` expressions that are not below an `Or`
-    predicate.transformPre {
-      case or@Or(_, _) => NopExpression()
-      case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) =>
-        if (leftType.hasField(le.name) && rightType.hasField(re.name)) {
-          joinFieldsLeft += leftType.getFieldIndex(le.name)
-          joinFieldsRight += rightType.getFieldIndex(re.name)
-        } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) {
-          joinFieldsLeft += leftType.getFieldIndex(re.name)
-          joinFieldsRight += rightType.getFieldIndex(le.name)
-        } else {
-          // not an equi-join predicate
-        }
-        equiJoinExprs += eq
-        eq
-    }
-
-    // then remove the equi join expressions from the predicate
-    val resultExpr = predicate.transformPost {
-      // For OR, we can eliminate the OR since the equi join
-      // predicate is evaluated before the expression is evaluated
-      case or@Or(NopExpression(), _) => NopExpression()
-      case or@Or(_, NopExpression()) => NopExpression()
-      // For AND we replace it with the other expression, since the
-      // equi join predicate will always be true
-      case and@And(NopExpression(), other) => other
-      case and@And(other, NopExpression()) => other
-      case eq : EqualTo if equiJoinExprs.contains(eq) =>
-        NopExpression()
-    }
-
-    (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
deleted file mode 100644
index 21f989c..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions._
-import org.apache.flink.api.expressions.tree.{ResolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.collection.mutable
-
-
-/**
- * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions.
- */
-class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
-
-  def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression)
-
-  object CheckGroupExpression extends Rule {
-
-    def apply(expr: Expression) = {
-      val errors = mutable.MutableList[String]()
-
-      expr match {
-        case f: ResolvedFieldReference => // this is OK
-        case other =>
-          throw new ExpressionException(
-            s"""Invalid grouping expression "$expr". Only field references are allowed.""")
-      }
-      expr
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
deleted file mode 100644
index 319e72f..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo}
-
-/**
- * [[Rule]] that adds casts in arithmetic operations.
- */
-class InsertAutoCasts extends Rule {
-
-  def apply(expr: Expression) = {
-    val result = expr.transformPost {
-
-      case plus@Plus(o1, o2) =>
-        // Plus is special case since we can cast anything to String for String concat
-        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            Plus(Cast(o1, o2.typeInfo), o2)
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            Plus(o1, Cast(o2, o1.typeInfo))
-          } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
-            Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO))
-          } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
-            Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2)
-          } else {
-            plus
-          }
-        } else {
-          plus
-        }
-
-      case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] ||
-        ba.isInstanceOf[BinaryComparison] =>
-        val o1 = ba.left
-        val o2 = ba.right
-        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
-          } else {
-            ba
-          }
-        } else {
-          ba
-        }
-
-      case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] =>
-        val o1 = ba.left
-        val o2 = ba.right
-        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
-          o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
-          } else {
-            ba
-          }
-        } else {
-          ba
-        }
-    }
-
-    result
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
deleted file mode 100644
index f108f5c..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
- * Analyzer for unary predicates, i.e. filter operations.
- */
-class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
-  def rules = Seq(
-    new ResolveFieldReferences(inputFields),
-    new InsertAutoCasts,
-    new TypeCheck,
-    new VerifyNoAggregates,
-    new VerifyBoolean)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
deleted file mode 100644
index 693dd88..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.{ResolvedFieldReference,
-UnresolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.expressions._
-
-import scala.collection.mutable
-
-/**
- * Rule that resolved field references. This rule verifies that field references point to existing
- * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field
- * [[TypeInformation]] in addition to the field name.
- */
-class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])]) extends Rule {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPost {
-      case fe@UnresolvedFieldReference(fieldName) =>
-        inputFields.find { _._1 == fieldName } match {
-          case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe)
-
-          case None =>
-            errors +=
-              s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}"
-            fe
-        }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
deleted file mode 100644
index 853ee7a..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.Expression
-
-/**
- * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets on
- * [[Expression]] and must return an expression. The returned [[Expression]] can also be
- * the input [[Expression]]. In an [[Analyzer]] rule chain the result [[Expression]] of one
- * [[Rule]] is fed into the next [[Rule]] in the chain.
- */
-abstract class Rule {
-  def apply(expr: Expression): Expression
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
deleted file mode 100644
index eca007f..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
- * This analyzes selection expressions.
- */
-class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
-
-  def rules = Seq(
-    new ResolveFieldReferences(inputFields),
-    new VerifyNoNestedAggregates,
-    new InsertAutoCasts,
-    new TypeCheck)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
deleted file mode 100644
index 632daa3..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.expressions.{_}
-
-import scala.collection.mutable
-
-/**
- * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once.
- * Expressions are expected to perform type verification in this method.
- */
-class TypeCheck extends Rule {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPre {
-      case expr: Expression=> {
-        // simply get the typeInfo from the expression. this will perform type analysis
-        try {
-          expr.typeInfo
-        } catch {
-          case e: ExpressionException =>
-            errors += e.getMessage
-        }
-        expr
-      }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
deleted file mode 100644
index d0bd6b6..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.{NopExpression, Expression}
-import org.apache.flink.api.expressions.{_}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-import scala.collection.mutable
-
-/**
- * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required
- * for filter/join predicates.
- */
-class VerifyBoolean extends Rule {
-
-  def apply(expr: Expression) = {
-    if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.")
-    }
-
-    expr
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
deleted file mode 100644
index e9f8788..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.expressions.tree.{Aggregation, Expression}
-
-import scala.collection.mutable
-
-/**
- * Rule that verifies that an expression does not contain aggregate operations. Right now, join
- * predicates and filter predicates cannot contain aggregates.
- */
-class VerifyNoAggregates extends Rule {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPre {
-      case agg: Aggregation=> {
-        errors +=
-          s"""Aggregations are not allowed in join/filter predicates."""
-        agg
-      }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
deleted file mode 100644
index de5063a..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.expressions.tree.{Expression, Aggregation}
-
-import scala.collection.mutable
-
-/**
- * Rule that verifies that an expression does not contain aggregate operations
- * as children of aggregate operations.
- */
-class VerifyNoNestedAggregates extends Rule {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPre {
-      case agg: Aggregation=> {
-        if (agg.child.exists(_.isInstanceOf[Aggregation])) {
-          errors += s"""Found nested aggregation inside "$agg"."""
-        }
-        agg
-      }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}