You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/29 13:50:38 UTC
[09/12] flink git commit: [FLINK-1623] Rename Expression API to Table
API
[FLINK-1623] Rename Expression API to Table API
Package name is now flink-table. ExpressionOperation is renamed to
Table.
This also adds more JavaDoc and ScalDoc.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9519c8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9519c8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9519c8d
Branch: refs/heads/master
Commit: c9519c8d6c869d2bfab186e449f0ad2b62484805
Parents: d7d9b63
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 18 14:44:42 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Mar 29 12:27:53 2015 +0200
----------------------------------------------------------------------
docs/linq.md | 90 ++-
.../org/apache/flink/api/scala/DataSet.scala | 2 +-
flink-staging/flink-expressions/pom.xml | 246 -------
.../api/java/expressions/package-info.java | 22 -
.../examples/java/JavaExpressionExample.java | 69 --
.../api/expressions/ExpressionException.scala | 23 -
.../api/expressions/ExpressionOperation.scala | 245 -------
.../org/apache/flink/api/expressions/Row.scala | 38 --
.../api/expressions/analysis/Analyzer.scala | 38 --
.../analysis/ExtractEquiJoinFields.scala | 70 --
.../expressions/analysis/GroupByAnalyzer.scala | 48 --
.../expressions/analysis/InsertAutoCasts.scala | 91 ---
.../analysis/PredicateAnalyzer.scala | 32 -
.../analysis/ResolveFieldReferences.scala | 57 --
.../flink/api/expressions/analysis/Rule.scala | 30 -
.../analysis/SelectionAnalyzer.scala | 33 -
.../api/expressions/analysis/TypeCheck.scala | 56 --
.../expressions/analysis/VerifyBoolean.scala | 40 --
.../analysis/VerifyNoAggregates.scala | 51 --
.../analysis/VerifyNoNestedAggregates.scala | 52 --
.../codegen/ExpressionCodeGenerator.scala | 635 -------------------
.../codegen/GenerateBinaryPredicate.scala | 73 ---
.../codegen/GenerateBinaryResultAssembler.scala | 60 --
.../codegen/GenerateResultAssembler.scala | 99 ---
.../codegen/GenerateUnaryPredicate.scala | 67 --
.../codegen/GenerateUnaryResultAssembler.scala | 57 --
.../flink/api/expressions/codegen/package.scala | 25 -
.../operations/ExpandAggregations.scala | 144 -----
.../operations/OperationTranslator.scala | 35 -
.../api/expressions/operations/operations.scala | 101 ---
.../api/expressions/operations/package.scala | 24 -
.../expressions/parser/ExpressionParser.scala | 209 ------
.../runtime/ExpressionAggregateFunction.scala | 72 ---
.../runtime/ExpressionFilterFunction.scala | 47 --
.../runtime/ExpressionJoinFunction.scala | 76 ---
.../runtime/ExpressionSelectFunction.scala | 51 --
.../flink/api/expressions/runtime/package.scala | 23 -
.../flink/api/expressions/tree/Expression.scala | 149 -----
.../api/expressions/tree/aggregations.scala | 99 ---
.../flink/api/expressions/tree/arithmetic.scala | 145 -----
.../flink/api/expressions/tree/cast.scala | 24 -
.../flink/api/expressions/tree/comparison.scala | 93 ---
.../api/expressions/tree/fieldExpression.scala | 41 --
.../flink/api/expressions/tree/literals.scala | 40 --
.../flink/api/expressions/tree/logic.scala | 58 --
.../flink/api/expressions/tree/package.scala | 29 -
.../expressions/tree/stringExpressions.scala | 46 --
.../expressions/typeinfo/RenameOperator.scala | 36 --
.../typeinfo/RenamingProxyTypeInfo.scala | 109 ----
.../expressions/typeinfo/RowSerializer.scala | 121 ----
.../api/expressions/typeinfo/RowTypeInfo.scala | 51 --
.../api/java/expressions/ExpressionUtil.scala | 112 ----
.../scala/expressions/DataSetConversions.scala | 66 --
.../expressions/DataStreamConversions.scala | 65 --
.../scala/expressions/JavaBatchTranslator.scala | 392 ------------
.../expressions/JavaStreamingTranslator.scala | 303 ---------
.../expressions/ScalaBatchTranslator.scala | 55 --
.../expressions/ScalaStreamingTranslator.scala | 56 --
.../api/scala/expressions/expressionDsl.scala | 124 ----
.../flink/api/scala/expressions/package.scala | 102 ---
.../examples/scala/PageRankExpression.scala | 210 ------
.../scala/StreamingExpressionFilter.scala | 90 ---
.../examples/scala/TPCHQuery3Expression.scala | 174 -----
.../expressions/test/AggregationsITCase.java | 210 ------
.../api/java/expressions/test/AsITCase.java | 158 -----
.../java/expressions/test/CastingITCase.java | 130 ----
.../expressions/test/ExpressionsITCase.java | 192 ------
.../api/java/expressions/test/FilterITCase.java | 130 ----
.../test/GroupedAggregationsITCase.java | 126 ----
.../api/java/expressions/test/JoinITCase.java | 202 ------
.../api/java/expressions/test/SelectITCase.java | 169 -----
.../test/StringExpressionsITCase.java | 144 -----
.../test/PageRankExpressionITCase.java | 100 ---
.../expressions/test/AggregationsITCase.scala | 127 ----
.../api/scala/expressions/test/AsITCase.scala | 124 ----
.../scala/expressions/test/CastingITCase.scala | 92 ---
.../expressions/test/ExpressionsITCase.scala | 127 ----
.../scala/expressions/test/FilterITCase.scala | 151 -----
.../test/GroupedAggreagationsITCase.scala | 96 ---
.../api/scala/expressions/test/JoinITCase.scala | 145 -----
.../scala/expressions/test/SelectITCase.scala | 143 -----
.../test/StringExpressionsITCase.scala | 98 ---
.../flink/streaming/api/scala/DataStream.scala | 5 +
flink-staging/flink-table/pom.xml | 246 +++++++
.../flink/api/java/table/package-info.java | 60 ++
.../apache/flink/api/table/package-info.java | 33 +
.../flink/examples/java/JavaTableExample.java | 71 +++
.../api/java/table/JavaBatchTranslator.scala | 319 ++++++++++
.../java/table/JavaStreamingTranslator.scala | 236 +++++++
.../flink/api/java/table/TableEnvironment.scala | 112 ++++
.../api/scala/table/DataSetConversions.scala | 67 ++
.../api/scala/table/DataStreamConversions.scala | 68 ++
.../api/scala/table/ScalaBatchTranslator.scala | 68 ++
.../scala/table/ScalaStreamingTranslator.scala | 58 ++
.../flink/api/scala/table/expressionDsl.scala | 124 ++++
.../apache/flink/api/scala/table/package.scala | 101 +++
.../flink/api/table/ExpressionException.scala | 23 +
.../scala/org/apache/flink/api/table/Row.scala | 38 ++
.../org/apache/flink/api/table/Table.scala | 243 +++++++
.../flink/api/table/analysis/Analyzer.scala | 38 ++
.../table/analysis/ExtractEquiJoinFields.scala | 70 ++
.../api/table/analysis/GroupByAnalyzer.scala | 48 ++
.../api/table/analysis/InsertAutoCasts.scala | 91 +++
.../api/table/analysis/PredicateAnalyzer.scala | 32 +
.../table/analysis/ResolveFieldReferences.scala | 57 ++
.../apache/flink/api/table/analysis/Rule.scala | 30 +
.../api/table/analysis/SelectionAnalyzer.scala | 33 +
.../flink/api/table/analysis/TypeCheck.scala | 56 ++
.../api/table/analysis/VerifyBoolean.scala | 40 ++
.../api/table/analysis/VerifyNoAggregates.scala | 51 ++
.../analysis/VerifyNoNestedAggregates.scala | 52 ++
.../table/codegen/ExpressionCodeGenerator.scala | 635 +++++++++++++++++++
.../table/codegen/GenerateBinaryPredicate.scala | 73 +++
.../codegen/GenerateBinaryResultAssembler.scala | 60 ++
.../table/codegen/GenerateResultAssembler.scala | 99 +++
.../table/codegen/GenerateUnaryPredicate.scala | 67 ++
.../codegen/GenerateUnaryResultAssembler.scala | 57 ++
.../flink/api/table/codegen/package.scala | 25 +
.../table/operations/ExpandAggregations.scala | 144 +++++
.../api/table/operations/TableTranslator.scala | 158 +++++
.../flink/api/table/operations/operations.scala | 101 +++
.../flink/api/table/operations/package.scala | 24 +
.../org/apache/flink/api/table/package.scala | 34 +
.../api/table/parser/ExpressionParser.scala | 209 ++++++
.../runtime/ExpressionAggregateFunction.scala | 72 +++
.../runtime/ExpressionFilterFunction.scala | 47 ++
.../table/runtime/ExpressionJoinFunction.scala | 76 +++
.../runtime/ExpressionSelectFunction.scala | 51 ++
.../flink/api/table/runtime/package.scala | 23 +
.../flink/api/table/tree/Expression.scala | 149 +++++
.../flink/api/table/tree/aggregations.scala | 99 +++
.../flink/api/table/tree/arithmetic.scala | 145 +++++
.../org/apache/flink/api/table/tree/cast.scala | 24 +
.../flink/api/table/tree/comparison.scala | 93 +++
.../flink/api/table/tree/fieldExpression.scala | 41 ++
.../apache/flink/api/table/tree/literals.scala | 40 ++
.../org/apache/flink/api/table/tree/logic.scala | 58 ++
.../apache/flink/api/table/tree/package.scala | 29 +
.../api/table/tree/stringExpressions.scala | 46 ++
.../api/table/typeinfo/RenameOperator.scala | 36 ++
.../table/typeinfo/RenamingProxyTypeInfo.scala | 109 ++++
.../api/table/typeinfo/RowSerializer.scala | 121 ++++
.../flink/api/table/typeinfo/RowTypeInfo.scala | 51 ++
.../examples/scala/PageRankExpression.scala | 210 ++++++
.../scala/StreamingExpressionFilter.scala | 90 +++
.../examples/scala/TPCHQuery3Expression.scala | 174 +++++
.../api/java/table/test/AggregationsITCase.java | 215 +++++++
.../flink/api/java/table/test/AsITCase.java | 165 +++++
.../api/java/table/test/CastingITCase.java | 133 ++++
.../api/java/table/test/ExpressionsITCase.java | 197 ++++++
.../flink/api/java/table/test/FilterITCase.java | 133 ++++
.../table/test/GroupedAggregationsITCase.java | 131 ++++
.../flink/api/java/table/test/JoinITCase.java | 216 +++++++
.../flink/api/java/table/test/SelectITCase.java | 180 ++++++
.../table/test/StringExpressionsITCase.java | 150 +++++
.../table/test/PageRankExpressionITCase.java | 100 +++
.../scala/table/test/AggregationsITCase.scala | 127 ++++
.../flink/api/scala/table/test/AsITCase.scala | 124 ++++
.../api/scala/table/test/CastingITCase.scala | 92 +++
.../scala/table/test/ExpressionsITCase.scala | 127 ++++
.../api/scala/table/test/FilterITCase.scala | 151 +++++
.../table/test/GroupedAggreagationsITCase.scala | 96 +++
.../flink/api/scala/table/test/JoinITCase.scala | 145 +++++
.../api/scala/table/test/SelectITCase.scala | 143 +++++
.../table/test/StringExpressionsITCase.scala | 98 +++
flink-staging/pom.xml | 2 +-
166 files changed, 8727 insertions(+), 8523 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/docs/linq.md
----------------------------------------------------------------------
diff --git a/docs/linq.md b/docs/linq.md
index ebb0063..79fe6f2 100644
--- a/docs/linq.md
+++ b/docs/linq.md
@@ -1,5 +1,5 @@
---
-title: "Language-Integrated Queries"
+title: "Language-Integrated Queries (Table API)"
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -23,58 +23,92 @@ under the License.
* This will be replaced by the TOC
{:toc}
-**Language-Integrated Queries are an experimental feature and can currently only be used with
-the Scala API**
+**Language-Integrated Queries are an experimental feature**
-Flink provides an API that allows specifying operations using SQL-like expressions.
-This Expression API can be enabled by importing
-`org.apache.flink.api.scala.expressions._`. This enables implicit conversions that allow
-converting a `DataSet` or `DataStream` to an `ExpressionOperation` on which relational queries
-can be specified. This example shows how a `DataSet` can be converted, how expression operations
-can be specified and how an expression operation can be converted back to a `DataSet`:
+Flink provides an API that allows specifying operations using SQL-like expressions. Instead of
+manipulating `DataSet` or `DataStream` you work with `Table` on which relational operations can
+be performed.
+
+The following dependency must be added to your project when using the Table API:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table</artifactId>
+ <version>{{site.FLINK_VERSION_SHORT }}</version>
+</dependency>
+{% endhighlight %}
+
+## Scala Table API
+
+The Table API can be enabled by importing `org.apache.flink.api.scala.table._`. This enables
+implicit conversions that allow
+converting a DataSet or DataStream to a Table. This example shows how a DataSet can
+be converted, how relational queries can be specified and how a Table can be
+converted back to a DataSet:
{% highlight scala %}
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.table._
case class WC(word: String, count: Int)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-val expr = input.toExpression
-val result = expr.groupBy('word).select('word, 'count.sum).as[WC]
+val expr = input.toTable
+val result = expr.groupBy('word).select('word, 'count.sum).toSet[WC]
{% endhighlight %}
The expression DSL uses Scala symbols to refer to field names and we use code generation to
-transform expressions to efficient runtime code. Please not that the conversion to and from
-expression operations only works when using Scala case classes or Flink POJOs. Please check out
+transform expressions to efficient runtime code. Please note that the conversion to and from
+Tables only works when using Scala case classes or Flink POJOs. Please check out
the [programming guide](programming_guide.html) to learn the requirements for a class to be
considered a POJO.
This is another example that shows how you
-can join to operations:
+can join to Tables:
{% highlight scala %}
case class MyResult(a: String, b: Int)
val input1 = env.fromElements(...).as('a, 'b)
val input2 = env.fromElements(...).as('c, 'd)
-val joined = input1.join(input2).where('b == 'a && 'd > 42).select('a, 'd).as[MyResult]
+val joined = input1.join(input2).where("b = a && d > 42").select("a, d").as[MyResult]
{% endhighlight %}
-Notice, how a `DataSet` can be converted to an expression operation by using `as` and specifying new
-names for the fields. This can also be used to disambiguate fields before a join operation.
+Notice, how a DataSet can be converted to a Table by using `as` and specifying new
+names for the fields. This can also be used to disambiguate fields before a join operation. Also,
+in this example we see that you can also use Strings to specify relational expressions.
-The Expression API can be used with the Streaming API, since we also have implicit conversions to
-and from `DataStream`.
+Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a
+description of the expression syntax.
-The following dependency must be added to your project when using the Expression API:
+## Java Table API
-{% highlight xml %}
-<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-expressions</artifactId>
- <version>{{site.FLINK_VERSION_SHORT }}</version>
-</dependency>
+When using Java, Tables can be converted to and from DataSet and DataStream using `TableEnvironment`.
+This example is equivalent to the above Scala Example:
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+TableEnvironment tableEnv = new TableEnvironment();
+
+DataSet<WC> input = env.fromElements(
+ new WC("Hello", 1),
+ new WC("Ciao", 1),
+ new WC("Hello", 1));
+
+Table table = tableEnv.toTable(input);
+
+Table filtered = table
+ .groupBy("word")
+ .select("word.count as count, word")
+ .filter("count = 2");
+
+DataSet<WC> result = tableEnv.toSet(filtered, WC.class);
{% endhighlight %}
-Please refer to the scaladoc for a full list of supported operations and a description of the
+When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
+are supported. They support exactly the same feature set as the expression DSL.
+
+Please refer to the Javadoc for a full list of supported operations and a description of the
expression syntax.
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 2732112..de07a57 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -90,7 +90,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
/**
* Returns the TypeInformation for the elements of this DataSet.
*/
- def getType: TypeInformation[T] = set.getType
+ def getType(): TypeInformation[T] = set.getType
/**
* Returns the execution environment associated with the current DataSet.
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/pom.xml b/flink-staging/flink-expressions/pom.xml
deleted file mode 100644
index f26ab03..0000000
--- a/flink-staging/flink-expressions/pom.xml
+++ /dev/null
@@ -1,246 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-staging</artifactId>
- <version>0.9-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-expressions</artifactId>
- <name>flink-expressions</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala-examples</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <!-- Scala Compiler -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.4</version>
- <executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) compile phase -->
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
-
- <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
- scala classes can be resolved later in the (Java) test-compile phase -->
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms128m</jvmArg>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- <compilerPlugins combine.children="append">
- <compilerPlugin>
- <groupId>org.scalamacros</groupId>
- <artifactId>paradise_${scala.version}</artifactId>
- <version>${scala.macros.version}</version>
- </compilerPlugin>
- </compilerPlugins>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <projectnatures>
- <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- </projectnatures>
- <buildcommands>
- <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <classpathContainers>
- <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <excludes>
- <exclude>org.scala-lang:scala-library</exclude>
- <exclude>org.scala-lang:scala-compiler</exclude>
- </excludes>
- <sourceIncludes>
- <sourceInclude>**/*.scala</sourceInclude>
- <sourceInclude>**/*.java</sourceInclude>
- </sourceIncludes>
- </configuration>
- </plugin>
-
- <!-- Adding scala source directories to build path -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <!-- Add src/main/scala to eclipse build path -->
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <!-- Add src/test/scala to eclipse build path -->
- <execution>
- <id>add-test-source</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <version>0.5.0</version>
- <executions>
- <execution>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <verbose>false</verbose>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <failOnWarning>false</failOnWarning>
- <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
- <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
- <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
- <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
- <outputEncoding>UTF-8</outputEncoding>
- </configuration>
- </plugin>
-
- </plugins>
- </build>
-
- <profiles>
- <profile>
- <id>scala-2.10</id>
- <activation>
- <property>
- <!-- this is the default scala profile -->
- <name>!scala-2.11</name>
- </property>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.scalamacros</groupId>
- <artifactId>quasiquotes_${scala.binary.version}</artifactId>
- <version>${scala.macros.version}</version>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
deleted file mode 100644
index 07e18b2..0000000
--- a/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package doc wohoooo
- */
-package org.apache.flink.api.java.expressions;
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
deleted file mode 100644
index 42632f9..0000000
--- a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.java;
-
-
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-
-/**
- * Very simple example that shows how the Java Expression API can be used.
- */
-public class JavaExpressionExample {
-
- public static class WC {
- public String word;
- public int count;
-
- public WC() {
-
- }
-
- public WC(String word, int count) {
- this.word = word;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return "WC " + word + " " + count;
- }
- }
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-
- DataSet<WC> input = env.fromElements(
- new WC("Hello", 1),
- new WC("Ciao", 1),
- new WC("Hello", 1));
-
- ExpressionOperation expr = ExpressionUtil.from(input);
-
- ExpressionOperation filtered = expr
- .groupBy("word")
- .select("word.count as count, word")
- .filter("count = 2");
-
- DataSet<WC> result = ExpressionUtil.toSet(filtered, WC.class);
-
- result.print();
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
deleted file mode 100644
index 34e400f..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-/**
- * Exception for all errors occurring during expression operations.
- */
-class ExpressionException(msg: String) extends RuntimeException(msg)
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
deleted file mode 100644
index 38417b2..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.expressions.analysis.{GroupByAnalyzer, SelectionAnalyzer,
-PredicateAnalyzer}
-import org.apache.flink.api.expressions.operations._
-import org.apache.flink.api.expressions.parser.ExpressionParser
-import org.apache.flink.api.expressions.tree.{ResolvedFieldReference,
-UnresolvedFieldReference, Expression}
-
-/**
- * The abstraction for writing expression API programs. Similar to how the batch and streaming APIs
- * have [[org.apache.flink.api.scala.DataSet]] and
- * [[org.apache.flink.streaming.api.scala.DataStream]].
- *
- * Use the methods of [[ExpressionOperation]] to transform data or to revert back to the underlying
- * batch or streaming representation.
- */
-case class ExpressionOperation[A <: OperationTranslator](
- private[flink] val operation: Operation,
- private[flink] val operationTranslator: A) {
-
-
- /**
- * Converts the result of this operation back to a [[org.apache.flink.api.scala.DataSet]] or
- * [[org.apache.flink.streaming.api.scala.DataStream]].
- */
- def as[O](implicit tpe: TypeInformation[O]): operationTranslator.Representation[O] = {
- operationTranslator.translate(operation)
- }
-
- /**
- * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
- * can contain complex expressions and aggregations.
- *
- * Example:
- *
- * {{{
- * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10))
- * }}}
- */
- def select(fields: Expression*): ExpressionOperation[A] = {
- val analyzer = new SelectionAnalyzer(operation.outputFields)
- val analyzedFields = fields.map(analyzer.analyze)
- val fieldNames = analyzedFields map(_.name)
- if (fieldNames.toSet.size != fieldNames.size) {
- throw new ExpressionException(s"Resulting fields names are not unique in expression" +
- s""" "${fields.mkString(", ")}".""")
- }
- this.copy(operation = Select(operation, analyzedFields))
- }
-
- /**
- * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
- * can contain complex expressions and aggregations.
- *
- * Example:
- *
- * {{{
- * in.select("key, value.avg + " The average" as average, other.substring(0, 10)")
- * }}}
- */
- def select(fields: String): ExpressionOperation[A] = {
- val fieldExprs = ExpressionParser.parseExpressionList(fields)
- select(fieldExprs: _*)
- }
-
- /**
- * Renames the fields of the expression result. Use this to disambiguate fields before
- * joining to operations.
- *
- * Example:
- *
- * {{{
- * in.as('a, 'b)
- * }}}
- */
- def as(fields: Expression*): ExpressionOperation[A] = {
- fields forall {
- f => f.isInstanceOf[UnresolvedFieldReference]
- } match {
- case true =>
- case false => throw new ExpressionException("Only field expression allowed in as().")
- }
- this.copy(operation = As(operation, fields.toArray map { _.name }))
- }
-
- /**
- * Renames the fields of the expression result. Use this to disambiguate fields before
- * joining to operations.
- *
- * Example:
- *
- * {{{
- * in.as("a, b")
- * }}}
- */
- def as(fields: String): ExpressionOperation[A] = {
- val fieldExprs = ExpressionParser.parseExpressionList(fields)
- as(fieldExprs: _*)
- }
-
- /**
- * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
- * clause.
- *
- * Example:
- *
- * {{{
- * in.filter('name === "Fred")
- * }}}
- */
- def filter(predicate: Expression): ExpressionOperation[A] = {
- val analyzer = new PredicateAnalyzer(operation.outputFields)
- val analyzedPredicate = analyzer.analyze(predicate)
- this.copy(operation = Filter(operation, analyzedPredicate))
- }
-
- /**
- * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
- * clause.
- *
- * Example:
- *
- * {{{
- * in.filter("name === 'Fred'")
- * }}}
- */
- def filter(predicate: String): ExpressionOperation[A] = {
- val predicateExpr = ExpressionParser.parseExpression(predicate)
- filter(predicateExpr)
- }
-
- /**
- * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
- * clause.
- *
- * Example:
- *
- * {{{
- * in.filter(name === "Fred")
- * }}}
- */
- def where(predicate: Expression): ExpressionOperation[A] = {
- filter(predicate)
- }
-
- /**
- * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
- * clause.
- *
- * Example:
- *
- * {{{
- * in.filter("name === 'Fred'")
- * }}}
- */
- def where(predicate: String): ExpressionOperation[A] = {
- filter(predicate)
- }
-
- /**
- * Groups the elements on some grouping keys. Use this before a selection with aggregations
- * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
- *
- * Example:
- *
- * {{{
- * in.groupBy('key).select('key, 'value.avg)
- * }}}
- */
- def groupBy(fields: Expression*): ExpressionOperation[A] = {
- val analyzer = new GroupByAnalyzer(operation.outputFields)
- val analyzedFields = fields.map(analyzer.analyze)
-
- val illegalKeys = analyzedFields filter {
- case fe: ResolvedFieldReference => false // OK
- case e => true
- }
-
- if (illegalKeys.nonEmpty) {
- throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", "))
- }
-
- this.copy(operation = GroupBy(operation, analyzedFields))
- }
-
- /**
- * Groups the elements on some grouping keys. Use this before a selection with aggregations
- * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
- *
- * Example:
- *
- * {{{
- * in.groupBy("key").select("key, value.avg")
- * }}}
- */
- def groupBy(fields: String): ExpressionOperation[A] = {
- val fieldsExpr = ExpressionParser.parseExpressionList(fields)
- groupBy(fieldsExpr: _*)
- }
-
- /**
- * Joins to expression operations. Similar to an SQL join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary. You can use
- * where and select clauses after a join to further specify the behaviour of the join.
- *
- * Example:
- *
- * {{{
- * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
- * }}}
- */
- def join(right: ExpressionOperation[A]): ExpressionOperation[A] = {
- val leftInputNames = operation.outputFields.map(_._1).toSet
- val rightInputNames = right.operation.outputFields.map(_._1).toSet
- if (leftInputNames.intersect(rightInputNames).nonEmpty) {
- throw new ExpressionException(
- "Overlapping fields names on join input, result would be ambiguous: " +
- operation.outputFields.mkString(", ") +
- " and " +
- right.operation.outputFields.mkString(", ") )
- }
- this.copy(operation = Join(operation, right.operation))
- }
-
- override def toString: String = s"Expression($operation)"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
deleted file mode 100644
index 47ef59e..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-/**
- * This is used for executing expression operations. We use manually generated
- * TypeInfo to check the field types and create serializers and comparators.
- */
-class Row(arity: Int) extends Product {
-
- private val fields = new Array[Any](arity)
-
- def productArity = fields.length
-
- def productElement(i: Int): Any = fields(i)
-
- def setField(i: Int, value: Any): Unit = fields(i) = value
-
- def canEqual(that: Any) = false
-
- override def toString = fields.mkString(",")
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
deleted file mode 100644
index da71cdd..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.Expression
-
-/**
- * Base class for expression analyzers/transformers. Analyzers must implement method `rules` to
- * provide the chain of rules that are invoked one after another. The expression resulting
- * from one rule is fed into the next rule and the final result is returned from method `analyze`.
- */
-abstract class Analyzer {
-
- def rules: Seq[Rule]
-
- final def analyze(expr: Expression): Expression = {
- var currentTree = expr
- for (rule <- rules) {
- currentTree = rule(currentTree)
- }
- currentTree
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
deleted file mode 100644
index a4f8f25..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.common.typeutils.CompositeType
-
-import scala.collection.mutable
-
-/**
- * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified
- * expression without the equi-join predicates together with indices of the join fields
- * from both the left and right input.
- */
-object ExtractEquiJoinFields {
- def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = {
-
- val joinFieldsLeft = mutable.MutableList[Int]()
- val joinFieldsRight = mutable.MutableList[Int]()
-
- val equiJoinExprs = mutable.MutableList[EqualTo]()
- // First get all `===` expressions that are not below an `Or`
- predicate.transformPre {
- case or@Or(_, _) => NopExpression()
- case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) =>
- if (leftType.hasField(le.name) && rightType.hasField(re.name)) {
- joinFieldsLeft += leftType.getFieldIndex(le.name)
- joinFieldsRight += rightType.getFieldIndex(re.name)
- } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) {
- joinFieldsLeft += leftType.getFieldIndex(re.name)
- joinFieldsRight += rightType.getFieldIndex(le.name)
- } else {
- // not an equi-join predicate
- }
- equiJoinExprs += eq
- eq
- }
-
- // then remove the equi join expressions from the predicate
- val resultExpr = predicate.transformPost {
- // For OR, we can eliminate the OR since the equi join
- // predicate is evaluated before the expression is evaluated
- case or@Or(NopExpression(), _) => NopExpression()
- case or@Or(_, NopExpression()) => NopExpression()
- // For AND we replace it with the other expression, since the
- // equi join predicate will always be true
- case and@And(NopExpression(), other) => other
- case and@And(other, NopExpression()) => other
- case eq : EqualTo if equiJoinExprs.contains(eq) =>
- NopExpression()
- }
-
- (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
deleted file mode 100644
index 21f989c..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions._
-import org.apache.flink.api.expressions.tree.{ResolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.collection.mutable
-
-
-/**
- * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions.
- */
-class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
-
- def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression)
-
- object CheckGroupExpression extends Rule {
-
- def apply(expr: Expression) = {
- val errors = mutable.MutableList[String]()
-
- expr match {
- case f: ResolvedFieldReference => // this is OK
- case other =>
- throw new ExpressionException(
- s"""Invalid grouping expression "$expr". Only field references are allowed.""")
- }
- expr
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
deleted file mode 100644
index 319e72f..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo}
-
-/**
- * [[Rule]] that adds casts in arithmetic operations.
- */
-class InsertAutoCasts extends Rule {
-
- def apply(expr: Expression) = {
- val result = expr.transformPost {
-
- case plus@Plus(o1, o2) =>
- // Plus is special case since we can cast anything to String for String concat
- if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
- if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
- o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
- Plus(Cast(o1, o2.typeInfo), o2)
- } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
- o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
- Plus(o1, Cast(o2, o1.typeInfo))
- } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
- Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO))
- } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
- Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2)
- } else {
- plus
- }
- } else {
- plus
- }
-
- case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] ||
- ba.isInstanceOf[BinaryComparison] =>
- val o1 = ba.left
- val o2 = ba.right
- if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
- if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
- o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
- ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
- } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
- o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
- ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
- } else {
- ba
- }
- } else {
- ba
- }
-
- case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] =>
- val o1 = ba.left
- val o2 = ba.right
- if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
- o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
- if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
- o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
- ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
- } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
- o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
- ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
- } else {
- ba
- }
- } else {
- ba
- }
- }
-
- result
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
deleted file mode 100644
index f108f5c..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
- * Analyzer for unary predicates, i.e. filter operations.
- */
-class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
- def rules = Seq(
- new ResolveFieldReferences(inputFields),
- new InsertAutoCasts,
- new TypeCheck,
- new VerifyNoAggregates,
- new VerifyBoolean)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
deleted file mode 100644
index 693dd88..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.{ResolvedFieldReference,
-UnresolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.expressions._
-
-import scala.collection.mutable
-
-/**
- * Rule that resolved field references. This rule verifies that field references point to existing
- * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field
- * [[TypeInformation]] in addition to the field name.
- */
-class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])]) extends Rule {
-
- def apply(expr: Expression) = {
- val errors = mutable.MutableList[String]()
-
- val result = expr.transformPost {
- case fe@UnresolvedFieldReference(fieldName) =>
- inputFields.find { _._1 == fieldName } match {
- case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe)
-
- case None =>
- errors +=
- s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}"
- fe
- }
- }
-
- if (errors.length > 0) {
- throw new ExpressionException(
- s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
- }
-
- result
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
deleted file mode 100644
index 853ee7a..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.Expression
-
-/**
- * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets on
- * [[Expression]] and must return an expression. The returned [[Expression]] can also be
- * the input [[Expression]]. In an [[Analyzer]] rule chain the result [[Expression]] of one
- * [[Rule]] is fed into the next [[Rule]] in the chain.
- */
-abstract class Rule {
- def apply(expr: Expression): Expression
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
deleted file mode 100644
index eca007f..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
- * This analyzes selection expressions.
- */
-class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer {
-
- def rules = Seq(
- new ResolveFieldReferences(inputFields),
- new VerifyNoNestedAggregates,
- new InsertAutoCasts,
- new TypeCheck)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
deleted file mode 100644
index 632daa3..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.expressions.{_}
-
-import scala.collection.mutable
-
-/**
- * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once.
- * Expressions are expected to perform type verification in this method.
- */
-class TypeCheck extends Rule {
-
- def apply(expr: Expression) = {
- val errors = mutable.MutableList[String]()
-
- val result = expr.transformPre {
- case expr: Expression=> {
- // simply get the typeInfo from the expression. this will perform type analysis
- try {
- expr.typeInfo
- } catch {
- case e: ExpressionException =>
- errors += e.getMessage
- }
- expr
- }
- }
-
- if (errors.length > 0) {
- throw new ExpressionException(
- s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
- }
-
- result
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
deleted file mode 100644
index d0bd6b6..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.tree.{NopExpression, Expression}
-import org.apache.flink.api.expressions.{_}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-import scala.collection.mutable
-
-/**
- * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required
- * for filter/join predicates.
- */
-class VerifyBoolean extends Rule {
-
- def apply(expr: Expression) = {
- if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.")
- }
-
- expr
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
deleted file mode 100644
index e9f8788..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.expressions.tree.{Aggregation, Expression}
-
-import scala.collection.mutable
-
-/**
- * Rule that verifies that an expression does not contain aggregate operations. Right now, join
- * predicates and filter predicates cannot contain aggregates.
- */
-class VerifyNoAggregates extends Rule {
-
- def apply(expr: Expression) = {
- val errors = mutable.MutableList[String]()
-
- val result = expr.transformPre {
- case agg: Aggregation=> {
- errors +=
- s"""Aggregations are not allowed in join/filter predicates."""
- agg
- }
- }
-
- if (errors.length > 0) {
- throw new ExpressionException(
- s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
- }
-
- result
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
deleted file mode 100644
index de5063a..0000000
--- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.expressions.analysis
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.expressions.tree.{Expression, Aggregation}
-
-import scala.collection.mutable
-
-/**
- * Rule that verifies that an expression does not contain aggregate operations
- * as children of aggregate operations.
- */
-class VerifyNoNestedAggregates extends Rule {
-
- def apply(expr: Expression) = {
- val errors = mutable.MutableList[String]()
-
- val result = expr.transformPre {
- case agg: Aggregation=> {
- if (agg.child.exists(_.isInstanceOf[Aggregation])) {
- errors += s"""Found nested aggregation inside "$agg"."""
- }
- agg
- }
- }
-
- if (errors.length > 0) {
- throw new ExpressionException(
- s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
- }
-
- result
-
- }
-}