You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:56 UTC
[27/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
new file mode 100644
index 0000000..0634f0b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.calcite.avatica.util.DateTimeUtils._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
+import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions._
+import java.math.{BigDecimal => JBigDecimal}
+
+import scala.language.implicitConversions
+
+/**
+ * These are all the operations that can be used to construct an [[Expression]] AST for expression
+ * operations.
+ *
+ * These operations must be kept in sync with the parser in
+ * [[org.apache.flink.table.expressions.ExpressionParser]].
+ */
+trait ImplicitExpressionOperations {
+ private[flink] def expr: Expression
+
+ /**
+ * Enables literals on left side of binary expressions.
+ *
+ * e.g. 12.toExpr % 'a
+ *
+ * @return expression
+ */
+ def toExpr: Expression = expr
+
+ def && (other: Expression) = And(expr, other)
+ def || (other: Expression) = Or(expr, other)
+
+ def > (other: Expression) = GreaterThan(expr, other)
+ def >= (other: Expression) = GreaterThanOrEqual(expr, other)
+ def < (other: Expression) = LessThan(expr, other)
+ def <= (other: Expression) = LessThanOrEqual(expr, other)
+
+ def === (other: Expression) = EqualTo(expr, other)
+ def !== (other: Expression) = NotEqualTo(expr, other)
+
+ def unary_! = Not(expr)
+ def unary_- = UnaryMinus(expr)
+
+ def isNull = IsNull(expr)
+ def isNotNull = IsNotNull(expr)
+
+ /**
+ * Returns true if given boolean expression is true. False otherwise (for null and false).
+ */
+ def isTrue = IsTrue(expr)
+
+ /**
+ * Returns true if given boolean expression is false. False otherwise (for null and true).
+ */
+ def isFalse = IsFalse(expr)
+
+ /**
+ * Returns true if given boolean expression is not true (for null and false). False otherwise.
+ */
+ def isNotTrue = IsNotTrue(expr)
+
+ /**
+ * Returns true if given boolean expression is not false (for null and true). False otherwise.
+ */
+ def isNotFalse = IsNotFalse(expr)
+
+ def + (other: Expression) = Plus(expr, other)
+ def - (other: Expression) = Minus(expr, other)
+ def / (other: Expression) = Div(expr, other)
+ def * (other: Expression) = Mul(expr, other)
+ def % (other: Expression) = mod(other)
+
+ def sum = Sum(expr)
+ def min = Min(expr)
+ def max = Max(expr)
+ def count = Count(expr)
+ def avg = Avg(expr)
+
+ def cast(toType: TypeInformation[_]) = Cast(expr, toType)
+
+ /**
+ * Specifies a name for an expression i.e. a field.
+ *
+ * @param name name for one field
+ * @param extraNames additional names if the expression expands to multiple fields
+ * @return field with an alias
+ */
+ def as(name: Symbol, extraNames: Symbol*) = Alias(expr, name.name, extraNames.map(_.name))
+
+ def asc = Asc(expr)
+ def desc = Desc(expr)
+
+ /**
+ * Returns the start time of a window when applied on a window reference.
+ */
+ def start = WindowStart(expr)
+
+ /**
+ * Returns the end time of a window when applied on a window reference.
+ */
+ def end = WindowEnd(expr)
+
+ /**
+ * Ternary conditional operator that decides which of two other expressions should be evaluated
+ * based on a evaluated boolean condition.
+ *
+ * e.g. (42 > 5).?("A", "B") leads to "A"
+ *
+ * @param ifTrue expression to be evaluated if condition holds
+ * @param ifFalse expression to be evaluated if condition does not hold
+ */
+ def ?(ifTrue: Expression, ifFalse: Expression) = {
+ If(expr, ifTrue, ifFalse)
+ }
+
+ // scalar functions
+
+ /**
+ * Calculates the remainder of division the given number by another one.
+ */
+ def mod(other: Expression) = Mod(expr, other)
+
+ /**
+ * Calculates the Euler's number raised to the given power.
+ */
+ def exp() = Exp(expr)
+
+ /**
+ * Calculates the base 10 logarithm of given value.
+ */
+ def log10() = Log10(expr)
+
+ /**
+ * Calculates the natural logarithm of given value.
+ */
+ def ln() = Ln(expr)
+
+ /**
+ * Calculates the given number raised to the power of the other value.
+ */
+ def power(other: Expression) = Power(expr, other)
+
+ /**
+ * Calculates the square root of a given value.
+ */
+ def sqrt() = Sqrt(expr)
+
+ /**
+ * Calculates the absolute value of given value.
+ */
+ def abs() = Abs(expr)
+
+ /**
+ * Calculates the largest integer less than or equal to a given number.
+ */
+ def floor() = Floor(expr)
+
+ /**
+ * Calculates the smallest integer greater than or equal to a given number.
+ */
+ def ceil() = Ceil(expr)
+
+ // String operations
+
+ /**
+ * Creates a substring of the given string at given index for a given length.
+ *
+ * @param beginIndex first character of the substring (starting at 1, inclusive)
+ * @param length number of characters of the substring
+ * @return substring
+ */
+ def substring(beginIndex: Expression, length: Expression) =
+ Substring(expr, beginIndex, length)
+
+ /**
+ * Creates a substring of the given string beginning at the given index to the end.
+ *
+ * @param beginIndex first character of the substring (starting at 1, inclusive)
+ * @return substring
+ */
+ def substring(beginIndex: Expression) =
+ new Substring(expr, beginIndex)
+
+ /**
+ * Removes leading and/or trailing characters from the given string.
+ *
+ * @param removeLeading if true, remove leading characters (default: true)
+ * @param removeTrailing if true, remove trailing characters (default: true)
+ * @param character string containing the character (default: " ")
+ * @return trimmed string
+ */
+ def trim(
+ removeLeading: Boolean = true,
+ removeTrailing: Boolean = true,
+ character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = {
+ if (removeLeading && removeTrailing) {
+ Trim(TrimMode.BOTH, character, expr)
+ } else if (removeLeading) {
+ Trim(TrimMode.LEADING, character, expr)
+ } else if (removeTrailing) {
+ Trim(TrimMode.TRAILING, character, expr)
+ } else {
+ expr
+ }
+ }
+
+ /**
+ * Returns the length of a string.
+ */
+ def charLength() = CharLength(expr)
+
+ /**
+ * Returns all of the characters in a string in upper case using the rules of
+ * the default locale.
+ */
+ def upperCase() = Upper(expr)
+
+ /**
+ * Returns all of the characters in a string in lower case using the rules of
+ * the default locale.
+ */
+ def lowerCase() = Lower(expr)
+
+ /**
+ * Converts the initial letter of each word in a string to uppercase.
+ * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
+ */
+ def initCap() = InitCap(expr)
+
+ /**
+ * Returns true, if a string matches the specified LIKE pattern.
+ *
+ * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
+ */
+ def like(pattern: Expression) = Like(expr, pattern)
+
+ /**
+ * Returns true, if a string matches the specified SQL regex pattern.
+ *
+ * e.g. "A+" matches all strings that consist of at least one A
+ */
+ def similar(pattern: Expression) = Similar(expr, pattern)
+
+ /**
+ * Returns the position of string in an other string starting at 1.
+ * Returns 0 if string could not be found.
+ *
+ * e.g. "a".position("bbbbba") leads to 6
+ */
+ def position(haystack: Expression) = Position(expr, haystack)
+
+ /**
+ * Replaces a substring of string with a string starting at a position (starting at 1).
+ *
+ * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
+ */
+ def overlay(newString: Expression, starting: Expression) = new Overlay(expr, newString, starting)
+
+ /**
+ * Replaces a substring of string with a string starting at a position (starting at 1).
+ * The length specifies how many characters should be removed.
+ *
+ * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
+ */
+ def overlay(newString: Expression, starting: Expression, length: Expression) =
+ Overlay(expr, newString, starting, length)
+
+ // Temporal operations
+
+ /**
+ * Parses a date string in the form "yy-mm-dd" to a SQL Date.
+ */
+ def toDate = Cast(expr, SqlTimeTypeInfo.DATE)
+
+ /**
+ * Parses a time string in the form "hh:mm:ss" to a SQL Time.
+ */
+ def toTime = Cast(expr, SqlTimeTypeInfo.TIME)
+
+ /**
+ * Parses a timestamp string in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp.
+ */
+ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
+
+ /**
+ * Extracts parts of a time point or time interval. Returns the part as a long value.
+ *
+ * e.g. "2006-06-05".toDate.extract(DAY) leads to 5
+ */
+ def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
+
+ /**
+ * Returns the quarter of a year from a SQL date.
+ *
+ * e.g. "1994-09-27".toDate.quarter() leads to 3
+ */
+ def quarter() = Quarter(expr)
+
+ /**
+ * Rounds down a time point to the given unit.
+ *
+ * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
+ */
+ def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr)
+
+ /**
+ * Rounds up a time point to the given unit.
+ *
+ * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
+ */
+ def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr)
+
+ // Interval types
+
+ /**
+ * Creates an interval of the given number of years.
+ *
+ * @return interval of months
+ */
+ def year = toMonthInterval(expr, 12)
+
+ /**
+ * Creates an interval of the given number of years.
+ *
+ * @return interval of months
+ */
+ def years = year
+
+ /**
+ * Creates an interval of the given number of months.
+ *
+ * @return interval of months
+ */
+ def month = toMonthInterval(expr, 1)
+
+ /**
+ * Creates an interval of the given number of months.
+ *
+ * @return interval of months
+ */
+ def months = month
+
+ /**
+ * Creates an interval of the given number of days.
+ *
+ * @return interval of milliseconds
+ */
+ def day = toMilliInterval(expr, MILLIS_PER_DAY)
+
+ /**
+ * Creates an interval of the given number of days.
+ *
+ * @return interval of milliseconds
+ */
+ def days = day
+
+ /**
+ * Creates an interval of the given number of hours.
+ *
+ * @return interval of milliseconds
+ */
+ def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
+
+ /**
+ * Creates an interval of the given number of hours.
+ *
+ * @return interval of milliseconds
+ */
+ def hours = hour
+
+ /**
+ * Creates an interval of the given number of minutes.
+ *
+ * @return interval of milliseconds
+ */
+ def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
+
+ /**
+ * Creates an interval of the given number of minutes.
+ *
+ * @return interval of milliseconds
+ */
+ def minutes = minute
+
+ /**
+ * Creates an interval of the given number of seconds.
+ *
+ * @return interval of milliseconds
+ */
+ def second = toMilliInterval(expr, MILLIS_PER_SECOND)
+
+ /**
+ * Creates an interval of the given number of seconds.
+ *
+ * @return interval of milliseconds
+ */
+ def seconds = second
+
+ /**
+ * Creates an interval of the given number of milliseconds.
+ *
+ * @return interval of milliseconds
+ */
+ def milli = toMilliInterval(expr, 1)
+
+ /**
+ * Creates an interval of the given number of milliseconds.
+ *
+ * @return interval of milliseconds
+ */
+ def millis = milli
+
+ // row interval type
+
+ /**
+ * Creates an interval of rows.
+ *
+ * @return interval of rows
+ */
+ def rows = toRowInterval(expr)
+
+ /**
+ * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
+ * returns it's value.
+ *
+ * @param name name of the field (similar to Flink's field expressions)
+ * @return value of the field
+ */
+ def get(name: String) = GetCompositeField(expr, name)
+
+ /**
+ * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
+ * returns it's value.
+ *
+ * @param index position of the field
+ * @return value of the field
+ */
+ def get(index: Int) = GetCompositeField(expr, index)
+
+ /**
+ * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
+ * into a flat representation where every subtype is a separate field.
+ */
+ def flatten() = Flattening(expr)
+
+ /**
+ * Accesses the element of an array based on an index (starting at 1).
+ *
+ * @param index position of the element (starting at 1)
+ * @return value of the element
+ */
+ def at(index: Expression) = ArrayElementAt(expr, index)
+
+ /**
+ * Returns the number of elements of an array.
+ *
+ * @return number of elements
+ */
+ def cardinality() = ArrayCardinality(expr)
+
+ /**
+ * Returns the sole element of an array with a single element. Returns null if the array is
+ * empty. Throws an exception if the array has more than one element.
+ *
+ * @return the first and only element of an array with a single element
+ */
+ def element() = ArrayElement(expr)
+}
+
+/**
+ * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]]
+ * to [[ImplicitExpressionOperations]].
+ */
+trait ImplicitExpressionConversions {
+ implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
+ def expr = e
+ }
+
+ implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations {
+ def expr = UnresolvedFieldReference(s.name)
+ }
+
+ implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations {
+ def expr = Literal(l)
+ }
+
+ implicit class LiteralByteExpression(b: Byte) extends ImplicitExpressionOperations {
+ def expr = Literal(b)
+ }
+
+ implicit class LiteralShortExpression(s: Short) extends ImplicitExpressionOperations {
+ def expr = Literal(s)
+ }
+
+ implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations {
+ def expr = Literal(i)
+ }
+
+ implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations {
+ def expr = Literal(f)
+ }
+
+ implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations {
+ def expr = Literal(d)
+ }
+
+ implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations {
+ def expr = Literal(str)
+ }
+
+ implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations {
+ def expr = Literal(bool)
+ }
+
+ implicit class LiteralJavaDecimalExpression(javaDecimal: java.math.BigDecimal)
+ extends ImplicitExpressionOperations {
+ def expr = Literal(javaDecimal)
+ }
+
+ implicit class LiteralScalaDecimalExpression(scalaDecimal: scala.math.BigDecimal)
+ extends ImplicitExpressionOperations {
+ def expr = Literal(scalaDecimal.bigDecimal)
+ }
+
+ implicit class LiteralSqlDateExpression(sqlDate: Date) extends ImplicitExpressionOperations {
+ def expr = Literal(sqlDate)
+ }
+
+ implicit class LiteralSqlTimeExpression(sqlTime: Time) extends ImplicitExpressionOperations {
+ def expr = Literal(sqlTime)
+ }
+
+ implicit class LiteralSqlTimestampExpression(sqlTimestamp: Timestamp)
+ extends ImplicitExpressionOperations {
+ def expr = Literal(sqlTimestamp)
+ }
+
+ implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name)
+ implicit def byte2Literal(b: Byte): Expression = Literal(b)
+ implicit def short2Literal(s: Short): Expression = Literal(s)
+ implicit def int2Literal(i: Int): Expression = Literal(i)
+ implicit def long2Literal(l: Long): Expression = Literal(l)
+ implicit def double2Literal(d: Double): Expression = Literal(d)
+ implicit def float2Literal(d: Float): Expression = Literal(d)
+ implicit def string2Literal(str: String): Expression = Literal(str)
+ implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
+ implicit def javaDec2Literal(javaDec: JBigDecimal): Expression = Literal(javaDec)
+ implicit def scalaDec2Literal(scalaDec: BigDecimal): Expression =
+ Literal(scalaDec.bigDecimal)
+ implicit def sqlDate2Literal(sqlDate: Date): Expression = Literal(sqlDate)
+ implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
+ implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression =
+ Literal(sqlTimestamp)
+ implicit def array2ArrayConstructor(array: Array[_]): Expression = convertArray(array)
+}
+
+// ------------------------------------------------------------------------------------------------
+// Expressions with no parameters
+// ------------------------------------------------------------------------------------------------
+
+// we disable the object checker here as it checks for capital letters of objects
+// but we want that objects look like functions in certain cases e.g. array(1, 2, 3)
+// scalastyle:off object.name
+
+/**
+ * Returns the current SQL date in UTC time zone.
+ */
+object currentDate {
+
+ /**
+ * Returns the current SQL date in UTC time zone.
+ */
+ def apply(): Expression = {
+ CurrentDate()
+ }
+}
+
+/**
+ * Returns the current SQL time in UTC time zone.
+ */
+object currentTime {
+
+ /**
+ * Returns the current SQL time in UTC time zone.
+ */
+ def apply(): Expression = {
+ CurrentTime()
+ }
+}
+
+/**
+ * Returns the current SQL timestamp in UTC time zone.
+ */
+object currentTimestamp {
+
+ /**
+ * Returns the current SQL timestamp in UTC time zone.
+ */
+ def apply(): Expression = {
+ CurrentTimestamp()
+ }
+}
+
+/**
+ * Returns the current SQL time in local time zone.
+ */
+object localTime {
+
+ /**
+ * Returns the current SQL time in local time zone.
+ */
+ def apply(): Expression = {
+ LocalTime()
+ }
+}
+
+/**
+ * Returns the current SQL timestamp in local time zone.
+ */
+object localTimestamp {
+
+ /**
+ * Returns the current SQL timestamp in local time zone.
+ */
+ def apply(): Expression = {
+ LocalTimestamp()
+ }
+}
+
+/**
+ * Determines whether two anchored time intervals overlap. Time point and temporal are
+ * transformed into a range defined by two time points (start, end). The function
+ * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
+ *
+ * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+ *
+ * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
+ */
+object temporalOverlaps {
+
+ /**
+ * Determines whether two anchored time intervals overlap. Time point and temporal are
+ * transformed into a range defined by two time points (start, end).
+ *
+ * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+ *
+ * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
+ */
+ def apply(
+ leftTimePoint: Expression,
+ leftTemporal: Expression,
+ rightTimePoint: Expression,
+ rightTemporal: Expression): Expression = {
+ TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
+ }
+}
+
+/**
+ * Creates an array of literals. The array will be an array of objects (not primitives).
+ */
+object array {
+
+ /**
+ * Creates an array of literals. The array will be an array of objects (not primitives).
+ */
+ def apply(head: Expression, tail: Expression*): Expression = {
+ ArrayConstructor(head +: tail.toSeq)
+ }
+}
+
+// scalastyle:on object.name
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
new file mode 100644
index 0000000..0e4c1c7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.api.{SessionWindow, SlideWithSize, TumblingWindow}
+
+/**
+ * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
+ * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+ * elements in 5 minutes intervals.
+ */
+object Tumble {
+
+ /**
+ * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
+ * windows. For example, a tumbling window of 5 minutes size groups
+ * elements in 5 minutes intervals.
+ *
+ * @param size the size of the window as time or row-count interval.
+ * @return a tumbling window
+ */
+ def over(size: Expression): TumblingWindow = new TumblingWindow(size)
+}
+
+/**
+ * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by
+ * a specified slide interval. If the slide interval is smaller than the window size, sliding
+ * windows are overlapping. Thus, an element can be assigned to multiple windows.
+ *
+ * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
+ * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
+ * window evaluations.
+ */
+object Slide {
+
+ /**
+ * Creates a sliding window. Sliding windows have a fixed size and slide by
+ * a specified slide interval. If the slide interval is smaller than the window size, sliding
+ * windows are overlapping. Thus, an element can be assigned to multiple windows.
+ *
+ * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
+ * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
+ * consecutive
+ *
+ * @param size the size of the window as time or row-count interval
+ * @return a partially specified sliding window
+ */
+ def over(size: Expression): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+ * Helper object for creating a session window. The boundary of session windows are defined by
+ * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+ * gap period.
+ */
+object Session {
+
+ /**
+ * Creates a session window. The boundary of session windows are defined by
+ * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+ * gap period.
+ *
+ * @param gap specifies how long (as interval of milliseconds) to wait for new data before
+ * closing the session window.
+ * @return a session window
+ */
+ def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
new file mode 100644
index 0000000..cd341cb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.types.Row
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
+import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.reflect.ClassTag
+
+/**
+ * == Table API (Scala) ==
+ *
+ * Importing this package with:
+ *
+ * {{{
+ * import org.apache.flink.table.api.scala._
+ * }}}
+ *
+ * imports implicit conversions for converting a [[DataSet]] and a [[DataStream]] to a
+ * [[Table]]. This can be used to perform SQL-like queries on data. Please have
+ * a look at [[Table]] to see which operations are supported and
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]] to see how an
+ * expression can be specified.
+ *
+ * When writing a query you can use Scala Symbols to refer to field names. One would
+ * refer to field `a` by writing `'a`. Sometimes it is necessary to manually convert a
+ * Scala literal to an Expression literal, in those cases use `Literal`, as in `Literal(3)`.
+ *
+ * Example:
+ *
+ * {{{
+ * import org.apache.flink.api.scala._
+ * import org.apache.flink.table.api.scala._
+ *
+ * val env = ExecutionEnvironment.getExecutionEnvironment
+ * val tEnv = TableEnvironment.getTableEnvironment(env)
+ *
+ * val input: DataSet[(String, Int)] = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
+ * val result = input
+ * .toTable(tEnv, 'word, 'count)
+ * .groupBy('word)
+ * .select('word, 'count.avg)
+ *
+ * result.print()
+ * }}}
+ *
+ */
+package object scala extends ImplicitExpressionConversions {
+
+ implicit def table2TableConversions(table: Table): TableConversions = {
+ new TableConversions(table)
+ }
+
+ implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = {
+ new DataSetConversions[T](set, set.getType())
+ }
+
+ implicit def table2RowDataSet(table: Table): DataSet[Row] = {
+ val tableEnv = table.tableEnv.asInstanceOf[ScalaBatchTableEnv]
+ tableEnv.toDataSet[Row](table)
+ }
+
+ implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
+ new DataStreamConversions[T](set, set.dataType.asInstanceOf[CompositeType[T]])
+ }
+
+ implicit def table2RowDataStream(table: Table): DataStream[Row] = {
+ val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv]
+ tableEnv.toDataStream[Row](table)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
new file mode 100644
index 0000000..6322026
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.logical.Minus
+import org.apache.flink.table.expressions.{Alias, Asc, Call, Expression, ExpressionParser, Ordering, TableFunctionCall, UnresolvedAlias}
+import org.apache.flink.table.plan.ProjectionTranslator._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.sinks.TableSink
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+ * A Table is the core component of the Table API.
+ * Similar to how the batch and streaming APIs have DataSet and DataStream,
+ * the Table API is built around [[Table]].
+ *
+ * Use the methods of [[Table]] to transform data. Use [[TableEnvironment]] to convert a [[Table]]
+ * back to a DataSet or DataStream.
+ *
+ * When using Scala a [[Table]] can also be converted using implicit conversions.
+ *
+ * Example:
+ *
+ * {{{
+ * val env = ExecutionEnvironment.getExecutionEnvironment
+ * val tEnv = TableEnvironment.getTableEnvironment(env)
+ *
+ * val set: DataSet[(String, Int)] = ...
+ * val table = set.toTable(tEnv, 'a, 'b)
+ * ...
+ * val table2 = ...
+ * val set2: DataSet[MyType] = table2.toDataSet[MyType]
+ * }}}
+ *
+ * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments
+ * in a Scala DSL or as an expression String. Please refer to the documentation for the expression
+ * syntax.
+ *
+ * @param tableEnv The [[TableEnvironment]] to which the table is bound.
+ * @param logicalPlan logical representation
+ */
+class Table(
+ private[flink] val tableEnv: TableEnvironment,
+ private[flink] val logicalPlan: LogicalNode) {
+
+ def relBuilder = tableEnv.getRelBuilder
+
+ def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
+
+ /**
+ * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
+ * can contain complex expressions and aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.select('key, 'value.avg + " The average" as 'average)
+ * }}}
+ */
+ def select(fields: Expression*): Table = {
+ val expandedFields = expandProjectList(fields, logicalPlan, tableEnv)
+ val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableEnv)
+ if (propNames.nonEmpty) {
+ throw ValidationException("Window properties can only be used on windowed tables.")
+ }
+
+ if (aggNames.nonEmpty) {
+ val projectsOnAgg = replaceAggregationsAndProperties(
+ expandedFields, tableEnv, aggNames, propNames)
+ val projectFields = extractFieldReferences(expandedFields)
+
+ new Table(tableEnv,
+ Project(projectsOnAgg,
+ Aggregate(Nil, aggNames.map(a => Alias(a._1, a._2)).toSeq,
+ Project(projectFields, logicalPlan).validate(tableEnv)
+ ).validate(tableEnv)
+ ).validate(tableEnv)
+ )
+ } else {
+ new Table(tableEnv,
+ Project(expandedFields.map(UnresolvedAlias), logicalPlan).validate(tableEnv))
+ }
+ }
+
+ /**
+ * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
+ * can contain complex expressions and aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.select("key, value.avg + ' The average' as average")
+ * }}}
+ */
+ def select(fields: String): Table = {
+ val fieldExprs = ExpressionParser.parseExpressionList(fields)
+ select(fieldExprs: _*)
+ }
+
+ /**
+ * Renames the fields of the expression result. Use this to disambiguate fields before
+ * joining to operations.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.as('a, 'b)
+ * }}}
+ */
+ def as(fields: Expression*): Table = {
+ new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Renames the fields of the expression result. Use this to disambiguate fields before
+ * joining to operations.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.as("a, b")
+ * }}}
+ */
+ def as(fields: String): Table = {
+ val fieldExprs = ExpressionParser.parseExpressionList(fields)
+ as(fieldExprs: _*)
+ }
+
+ /**
+ * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+ * clause.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.filter('name === "Fred")
+ * }}}
+ */
+ def filter(predicate: Expression): Table = {
+ new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+ * clause.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.filter("name = 'Fred'")
+ * }}}
+ */
+ def filter(predicate: String): Table = {
+ val predicateExpr = ExpressionParser.parseExpression(predicate)
+ filter(predicateExpr)
+ }
+
+ /**
+ * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+ * clause.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.where('name === "Fred")
+ * }}}
+ */
+ def where(predicate: Expression): Table = {
+ filter(predicate)
+ }
+
+ /**
+ * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+ * clause.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.where("name = 'Fred'")
+ * }}}
+ */
+ def where(predicate: String): Table = {
+ filter(predicate)
+ }
+
+ /**
+ * Groups the elements on some grouping keys. Use this before a selection with aggregations
+ * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.groupBy('key).select('key, 'value.avg)
+ * }}}
+ */
+ def groupBy(fields: Expression*): GroupedTable = {
+ new GroupedTable(this, fields)
+ }
+
+ /**
+ * Groups the elements on some grouping keys. Use this before a selection with aggregations
+ * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.groupBy("key").select("key, value.avg")
+ * }}}
+ */
+ def groupBy(fields: String): GroupedTable = {
+ val fieldsExpr = ExpressionParser.parseExpressionList(fields)
+ groupBy(fieldsExpr: _*)
+ }
+
+ /**
+ * Removes duplicate values and returns only distinct (different) values.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.select("key, value").distinct()
+ * }}}
+ */
+ def distinct(): Table = {
+ new Table(tableEnv, Distinct(logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary. You can use
+ * where and select clauses after a join to further specify the behaviour of the join.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
+ * }}}
+ */
+ def join(right: Table): Table = {
+ join(right, None, JoinType.INNER)
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.join(right, "a = b")
+ * }}}
+ */
+ def join(right: Table, joinPredicate: String): Table = {
+ join(right, joinPredicate, JoinType.INNER)
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.join(right, 'a === 'b).select('a, 'b, 'd)
+ * }}}
+ */
+ def join(right: Table, joinPredicate: Expression): Table = {
+ join(right, Some(joinPredicate), JoinType.INNER)
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+ * have nullCheck enabled.
+ *
+ * Example:
+ *
+ * {{{
+ * left.leftOuterJoin(right, "a = b").select('a, 'b, 'd)
+ * }}}
+ */
+ def leftOuterJoin(right: Table, joinPredicate: String): Table = {
+ join(right, joinPredicate, JoinType.LEFT_OUTER)
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+ * have nullCheck enabled.
+ *
+ * Example:
+ *
+ * {{{
+ * left.leftOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+ * }}}
+ */
+ def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
+ join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+ * have nullCheck enabled.
+ *
+ * Example:
+ *
+ * {{{
+ * left.rightOuterJoin(right, "a = b").select('a, 'b, 'd)
+ * }}}
+ */
+ def rightOuterJoin(right: Table, joinPredicate: String): Table = {
+ join(right, joinPredicate, JoinType.RIGHT_OUTER)
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+ * have nullCheck enabled.
+ *
+ * Example:
+ *
+ * {{{
+ * left.rightOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+ * }}}
+ */
+ def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
+ join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+ * have nullCheck enabled.
+ *
+ * Example:
+ *
+ * {{{
+ * left.fullOuterJoin(right, "a = b").select('a, 'b, 'd)
+ * }}}
+ */
+ def fullOuterJoin(right: Table, joinPredicate: String): Table = {
+ join(right, joinPredicate, JoinType.FULL_OUTER)
+ }
+
+ /**
+ * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
+ * operations must not overlap, use [[as]] to rename fields if necessary.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+ * have nullCheck enabled.
+ *
+ * Example:
+ *
+ * {{{
+ * left.fullOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+ * }}}
+ */
+ def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
+ join(right, Some(joinPredicate), JoinType.FULL_OUTER)
+ }
+
+ private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
+ val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
+ join(right, Some(joinPredicateExpr), joinType)
+ }
+
+ private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
+ // check that right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
+ }
+ new Table(
+ tableEnv,
+ Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
+ .validate(tableEnv))
+ }
+
+ /**
+ * Minus of two [[Table]]s with duplicate records removed.
+ * Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not
+ * exist in the right table. Duplicate records in the left table are returned
+ * exactly once, i.e., duplicates are removed. Both tables must have identical field types.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.minus(right)
+ * }}}
+ */
+ def minus(right: Table): Table = {
+ // check that right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new ValidationException("Only tables from the same TableEnvironment can be " +
+ "subtracted.")
+ }
+ new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
+ .validate(tableEnv))
+ }
+
+ /**
+ * Minus of two [[Table]]s. Similar to an SQL EXCEPT ALL.
+ * Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in
+ * the right table. A record that is present n times in the left table and m times
+ * in the right table is returned (n - m) times, i.e., as many duplicates as are present
+ * in the right table are removed. Both tables must have identical field types.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.minusAll(right)
+ * }}}
+ */
+ def minusAll(right: Table): Table = {
+ // check that right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new ValidationException("Only tables from the same TableEnvironment can be " +
+ "subtracted.")
+ }
+ new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
+ .validate(tableEnv))
+ }
+
+ /**
+ * Unions two [[Table]]s with duplicate records removed.
+ * Similar to an SQL UNION. The fields of the two union operations must fully overlap.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.union(right)
+ * }}}
+ */
+ def union(right: Table): Table = {
+ // check that right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
+ }
+ new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
+ }
+
+ /**
+ * Unions two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
+ * must fully overlap.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.unionAll(right)
+ * }}}
+ */
+ def unionAll(right: Table): Table = {
+ // check that right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
+ }
+ new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
+ }
+
+ /**
+ * Intersects two [[Table]]s with duplicate records removed. Intersect returns records that
+ * exist in both tables. If a record is present in one or both tables more than once, it is
+ * returned just once, i.e., the resulting table has no duplicate records. Similar to an
+ * SQL INTERSECT. The fields of the two intersect operations must fully overlap.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.intersect(right)
+ * }}}
+ */
+ def intersect(right: Table): Table = {
+ // check that right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new ValidationException(
+ "Only tables from the same TableEnvironment can be intersected.")
+ }
+ new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
+ }
+
+ /**
+ * Intersects two [[Table]]s. IntersectAll returns records that exist in both tables.
+ * If a record is present in both tables more than once, it is returned as many times as it
+ * is present in both tables, i.e., the resulting table might have duplicate records. Similar
+ * to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap.
+ *
+ * Note: Both tables must be bound to the same [[TableEnvironment]].
+ *
+ * Example:
+ *
+ * {{{
+ * left.intersectAll(right)
+ * }}}
+ */
+ def intersectAll(right: Table): Table = {
+ // check that right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new ValidationException(
+ "Only tables from the same TableEnvironment can be intersected.")
+ }
+ new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
+ }
+
+ /**
+ * Sorts the given [[Table]]. Similar to SQL ORDER BY.
+ * The resulting Table is globally sorted across all parallel partitions.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.orderBy('name.desc)
+ * }}}
+ */
+ def orderBy(fields: Expression*): Table = {
+ val order: Seq[Ordering] = fields.map {
+ case o: Ordering => o
+ case e => Asc(e)
+ }
+ new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Sorts the given [[Table]]. Similar to SQL ORDER BY.
+ * The resulting Table is sorted globally sorted across all parallel partitions.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.orderBy("name.desc")
+ * }}}
+ */
+ def orderBy(fields: String): Table = {
+ val parsedFields = ExpressionParser.parseExpressionList(fields)
+ orderBy(parsedFields: _*)
+ }
+
+ /**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * Example:
+ *
+ * {{{
+ * // returns unlimited number of records beginning with the 4th record
+ * tab.orderBy('name.desc).limit(3)
+ * }}}
+ *
+ * @param offset number of records to skip
+ */
+ def limit(offset: Int): Table = {
+ new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Limits a sorted result to a specified number of records from an offset position.
+ * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * Example:
+ *
+ * {{{
+ * // returns 5 records beginning with the 4th record
+ * tab.orderBy('name.desc).limit(3, 5)
+ * }}}
+ *
+ * @param offset number of records to skip
+ * @param fetch number of records to be returned
+ */
+ def limit(offset: Int, fetch: Int): Table = {
+ new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+ * to an SQL cross join, but it works with a table function. It returns rows from the outer
+ * table (table on the left of the operator) that produces matching values from the table
+ * function (which is defined in the expression on the right side of the operator).
+ *
+ * Example:
+ *
+ * {{{
+ * class MySplitUDTF extends TableFunction[String] {
+ * def eval(str: String): Unit = {
+ * str.split("#").foreach(collect)
+ * }
+ * }
+ *
+ * val split = new MySplitUDTF()
+ * table.join(split('c) as ('s)).select('a,'b,'c,'s)
+ * }}}
+ */
+ def join(udtf: Expression): Table = {
+ joinUdtfInternal(udtf, JoinType.INNER)
+ }
+
+ /**
+ * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+ * to an SQL cross join, but it works with a table function. It returns rows from the outer
+ * table (table on the left of the operator) that produces matching values from the table
+ * function (which is defined in the expression on the right side of the operator).
+ *
+ * Example:
+ *
+ * {{{
+ * class MySplitUDTF extends TableFunction<String> {
+ * public void eval(String str) {
+ * str.split("#").forEach(this::collect);
+ * }
+ * }
+ *
+ * TableFunction<String> split = new MySplitUDTF();
+ * tableEnv.registerFunction("split", split);
+ *
+ * table.join("split(c) as (s)").select("a, b, c, s");
+ * }}}
+ */
+ def join(udtf: String): Table = {
+ joinUdtfInternal(udtf, JoinType.INNER)
+ }
+
+ /**
+ * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+ * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
+ * the rows from the outer table (table on the left of the operator), and rows that do not match
+ * the condition from the table function (which is defined in the expression on the right
+ * side of the operator). Rows with no matching condition are filled with null values.
+ *
+ * Example:
+ *
+ * {{{
+ * class MySplitUDTF extends TableFunction[String] {
+ * def eval(str: String): Unit = {
+ * str.split("#").foreach(collect)
+ * }
+ * }
+ *
+ * val split = new MySplitUDTF()
+ * table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
+ * }}}
+ */
+ def leftOuterJoin(udtf: Expression): Table = {
+ joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
+ }
+
+ /**
+ * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+ * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
+ * the rows from the outer table (table on the left of the operator), and rows that do not match
+ * the condition from the table function (which is defined in the expression on the right
+ * side of the operator). Rows with no matching condition are filled with null values.
+ *
+ * Example:
+ *
+ * {{{
+ * class MySplitUDTF extends TableFunction<String> {
+ * public void eval(String str) {
+ * str.split("#").forEach(this::collect);
+ * }
+ * }
+ *
+ * TableFunction<String> split = new MySplitUDTF();
+ * tableEnv.registerFunction("split", split);
+ *
+ * table.leftOuterJoin("split(c) as (s)").select("a, b, c, s");
+ * }}}
+ */
+ def leftOuterJoin(udtf: String): Table = {
+ joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
+ }
+
+ private def joinUdtfInternal(udtfString: String, joinType: JoinType): Table = {
+ val udtf = ExpressionParser.parseExpression(udtfString)
+ joinUdtfInternal(udtf, joinType)
+ }
+
+ private def joinUdtfInternal(udtf: Expression, joinType: JoinType): Table = {
+ var alias: Option[Seq[String]] = None
+
+ // unwrap an Expression until we get a TableFunctionCall
+ def unwrap(expr: Expression): TableFunctionCall = expr match {
+ case Alias(child, name, extraNames) =>
+ alias = Some(Seq(name) ++ extraNames)
+ unwrap(child)
+ case Call(name, args) =>
+ val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
+ unwrap(function)
+ case c: TableFunctionCall => c
+ case _ =>
+ throw new TableException(
+ "Cross/Outer Apply operators only accept expressions that define table functions.")
+ }
+
+ val call = unwrap(udtf)
+ .as(alias)
+ .toLogicalTableFunctionCall(this.logicalPlan)
+ .validate(tableEnv)
+
+ new Table(
+ tableEnv,
+ Join(this.logicalPlan, call, joinType, None, correlated = true).validate(tableEnv))
+ }
+
+ /**
+ * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
+ *
+ * A batch [[Table]] can only be written to a
+ * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+ * [[org.apache.flink.table.sinks.StreamTableSink]].
+ *
+ * @param sink The [[TableSink]] to which the [[Table]] is written.
+ * @tparam T The data type that the [[TableSink]] expects.
+ */
+ def writeToSink[T](sink: TableSink[T]): Unit = {
+
+ // get schema information of table
+ val rowType = getRelNode.getRowType
+ val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
+ val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
+ .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+
+ // configure the table sink
+ val configuredSink = sink.configure(fieldNames, fieldTypes)
+
+ // emit the table to the configured table sink
+ tableEnv.writeToSink(this, configuredSink)
+ }
+
+ /**
+ * Groups the records of a table by assigning them to windows defined by a time or row interval.
+ *
+ * For streaming tables of infinite size, grouping into windows is required to define finite
+ * groups on which group-based aggregates can be computed.
+ *
+ * For batch tables of finite size, windowing essentially provides shortcuts for time-based
+ * groupBy.
+ *
+ * __Note__: window on non-grouped streaming table is a non-parallel operation, i.e., all data
+ * will be processed by a single operator.
+ *
+ * @param groupWindow group-window that specifies how elements are grouped.
+ * @return A windowed table.
+ */
+ def window(groupWindow: GroupWindow): GroupWindowedTable = {
+ if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+ throw new ValidationException(s"Windows on batch tables are currently not supported.")
+ }
+ new GroupWindowedTable(this, Seq(), groupWindow)
+ }
+}
+
+/**
+ * A table that has been grouped on a set of grouping keys.
+ */
+class GroupedTable(
+ private[flink] val table: Table,
+ private[flink] val groupKey: Seq[Expression]) {
+
+ /**
+ * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
+ * The field expressions can contain complex expressions and aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.groupBy('key).select('key, 'value.avg + " The average" as 'average)
+ * }}}
+ */
+ def select(fields: Expression*): Table = {
+ val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
+ if (propNames.nonEmpty) {
+ throw ValidationException("Window properties can only be used on windowed tables.")
+ }
+
+ val projectsOnAgg = replaceAggregationsAndProperties(
+ fields, table.tableEnv, aggNames, propNames)
+ val projectFields = extractFieldReferences(fields ++ groupKey)
+
+ new Table(table.tableEnv,
+ Project(projectsOnAgg,
+ Aggregate(groupKey, aggNames.map(a => Alias(a._1, a._2)).toSeq,
+ Project(projectFields, table.logicalPlan).validate(table.tableEnv)
+ ).validate(table.tableEnv)
+ ).validate(table.tableEnv))
+ }
+
+ /**
+ * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
+ * The field expressions can contain complex expressions and aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.groupBy("key").select("key, value.avg + ' The average' as average")
+ * }}}
+ */
+ def select(fields: String): Table = {
+ val fieldExprs = ExpressionParser.parseExpressionList(fields)
+ select(fieldExprs: _*)
+ }
+
+ /**
+ * Groups the records of a table by assigning them to windows defined by a time or row interval.
+ *
+ * For streaming tables of infinite size, grouping into windows is required to define finite
+ * groups on which group-based aggregates can be computed.
+ *
+ * For batch tables of finite size, windowing essentially provides shortcuts for time-based
+ * groupBy.
+ *
+ * @param groupWindow group-window that specifies how elements are grouped.
+ * @return A windowed table.
+ */
+ def window(groupWindow: GroupWindow): GroupWindowedTable = {
+ if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
+ throw new ValidationException(s"Windows on batch tables are currently not supported.")
+ }
+ new GroupWindowedTable(table, groupKey, groupWindow)
+ }
+}
+
+class GroupWindowedTable(
+ private[flink] val table: Table,
+ private[flink] val groupKey: Seq[Expression],
+ private[flink] val window: GroupWindow) {
+
+ /**
+ * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement.
+ * The field expressions can contain complex expressions and aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average)
+ * }}}
+ */
+ def select(fields: Expression*): Table = {
+ val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
+ val projectsOnAgg = replaceAggregationsAndProperties(
+ fields, table.tableEnv, aggNames, propNames)
+
+ val projectFields = (table.tableEnv, window) match {
+ // event time can be arbitrary field in batch environment
+ case (_: BatchTableEnvironment, w: EventTimeWindow) =>
+ extractFieldReferences(fields ++ groupKey ++ Seq(w.timeField))
+ case (_, _) =>
+ extractFieldReferences(fields ++ groupKey)
+ }
+
+ new Table(table.tableEnv,
+ Project(
+ projectsOnAgg,
+ WindowAggregate(
+ groupKey,
+ window.toLogicalWindow,
+ propNames.map(a => Alias(a._1, a._2)).toSeq,
+ aggNames.map(a => Alias(a._1, a._2)).toSeq,
+ Project(projectFields, table.logicalPlan).validate(table.tableEnv)
+ ).validate(table.tableEnv)
+ ).validate(table.tableEnv))
+ }
+
+ /**
+ * Performs a selection operation on a group-windows table. Similar to an SQL SELECT statement.
+ * The field expressions can contain complex expressions and aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * groupWindowTable.select("key, window.start, value.avg + ' The average' as average")
+ * }}}
+ */
+ def select(fields: String): Table = {
+ val fieldExprs = ExpressionParser.parseExpressionList(fields)
+ select(fieldExprs: _*)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
new file mode 100644
index 0000000..7e4498d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.plan.logical._
+
+/**
+ * A group-window specification.
+ *
+ * Group-windows group rows based on time or row-count intervals and is therefore essentially a
+ * special type of groupBy. Just like groupBy, group-windows allow to compute aggregates
+ * on groups of elements.
+ *
+ * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping
+ * is required to apply aggregations on streaming tables.
+ *
+ * For finite batch tables, group-windows provide shortcuts for time-based groupBy.
+ *
+ */
+trait GroupWindow {
+
+ /**
+ * Converts an API class to a logical window for planning.
+ */
+ private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+ * A group-window operating on event-time.
+ *
+ * @param timeField defines the time mode for streaming tables. For batch table it defines the
+ * time attribute on which is grouped.
+ */
+abstract class EventTimeWindow(val timeField: Expression) extends GroupWindow {
+
+ protected var name: Option[Expression] = None
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: Expression): EventTimeWindow = {
+ this.name = Some(alias)
+ this
+ }
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias))
+}
+
+// ------------------------------------------------------------------------------------------------
+// Tumbling group-windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+ * Tumbling group-window.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+ * grouped by processing-time.
+ *
+ * @param size the size of the window either as time or row-count interval.
+ */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+ /**
+ * Tumbling group-window.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+ * are grouped by processing-time.
+ *
+ * @param size the size of the window either as time or row-count interval.
+ */
+ def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+ private var alias: Option[Expression] = None
+
+ /**
+ * Specifies the time attribute on which rows are grouped.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+ * are grouped by processing-time.
+ *
+ * For batch tables, refer to a timestamp or long attribute.
+ *
+ * @param timeField time mode for streaming tables and time attribute for batch tables
+ * @return a tumbling group-window on event-time
+ */
+ def on(timeField: Expression): TumblingEventTimeWindow =
+ new TumblingEventTimeWindow(alias, timeField, size)
+
+ /**
+ * Specifies the time attribute on which rows are grouped.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+ * are grouped by processing-time.
+ *
+ * For batch tables, refer to a timestamp or long attribute.
+ *
+ * @param timeField time mode for streaming tables and time attribute for batch tables
+ * @return a tumbling group-window on event-time
+ */
+ def on(timeField: String): TumblingEventTimeWindow =
+ on(ExpressionParser.parseExpression(timeField))
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: Expression): TumblingWindow = {
+ this.alias = Some(alias)
+ this
+ }
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias))
+
+ override private[flink] def toLogicalWindow: LogicalWindow =
+ ProcessingTimeTumblingGroupWindow(alias, size)
+}
+
+/**
+ * Tumbling group-window on event-time.
+ */
+class TumblingEventTimeWindow(
+ alias: Option[Expression],
+ time: Expression,
+ size: Expression)
+ extends EventTimeWindow(time) {
+
+ override private[flink] def toLogicalWindow: LogicalWindow =
+ EventTimeTumblingGroupWindow(name.orElse(alias), time, size)
+}
+
+// ------------------------------------------------------------------------------------------------
+// Sliding group windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+ * Partially specified sliding window.
+ *
+ * @param size the size of the window either as time or row-count interval.
+ */
+class SlideWithSize(size: Expression) {
+
+ /**
+ * Partially specified sliding window.
+ *
+ * @param size the size of the window either as time or row-count interval.
+ */
+ def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+ /**
+ * Specifies the window's slide as time or row-count interval.
+ *
+ * The slide determines the interval in which windows are started. Hence, sliding windows can
+ * overlap if the slide is smaller than the size of the window.
+ *
+ * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
+ * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
+ * windows.
+ *
+ * @param slide the slide of the window either as time or row-count interval.
+ * @return a sliding group-window
+ */
+ def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide)
+
+ /**
+ * Specifies the window's slide as time or row-count interval.
+ *
+ * The slide determines the interval in which windows are started. Hence, sliding windows can
+ * overlap if the slide is smaller than the size of the window.
+ *
+ * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
+ * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
+ * windows.
+ *
+ * @param slide the slide of the window either as time or row-count interval.
+ * @return a sliding group-window
+ */
+ def every(slide: String): SlidingWindow = every(ExpressionParser.parseExpression(slide))
+}
+
+/**
+ * Sliding group-window.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+ * grouped by processing-time.
+ *
+ * @param size the size of the window either as time or row-count interval.
+ */
+class SlidingWindow(
+ size: Expression,
+ slide: Expression)
+ extends GroupWindow {
+
+ private var alias: Option[Expression] = None
+
+ /**
+ * Specifies the time attribute on which rows are grouped.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+ * are grouped by processing-time.
+ *
+ * For batch tables, refer to a timestamp or long attribute.
+ *
+ * @param timeField time mode for streaming tables and time attribute for batch tables
+ * @return a sliding group-window on event-time
+ */
+ def on(timeField: Expression): SlidingEventTimeWindow =
+ new SlidingEventTimeWindow(alias, timeField, size, slide)
+
+ /**
+ * Specifies the time attribute on which rows are grouped.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+ * are grouped by processing-time.
+ *
+ * For batch tables, refer to a timestamp or long attribute.
+ *
+ * @param timeField time mode for streaming tables and time attribute for batch tables
+ * @return a sliding group-window on event-time
+ */
+ def on(timeField: String): SlidingEventTimeWindow =
+ on(ExpressionParser.parseExpression(timeField))
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: Expression): SlidingWindow = {
+ this.alias = Some(alias)
+ this
+ }
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: String): SlidingWindow = as(ExpressionParser.parseExpression(alias))
+
+ override private[flink] def toLogicalWindow: LogicalWindow =
+ ProcessingTimeSlidingGroupWindow(alias, size, slide)
+}
+
+/**
+ * Sliding group-window on event-time.
+ */
+class SlidingEventTimeWindow(
+ alias: Option[Expression],
+ timeField: Expression,
+ size: Expression,
+ slide: Expression)
+ extends EventTimeWindow(timeField) {
+
+ override private[flink] def toLogicalWindow: LogicalWindow =
+ EventTimeSlidingGroupWindow(name.orElse(alias), timeField, size, slide)
+}
+
+// ------------------------------------------------------------------------------------------------
+// Session group windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+ * Session group-window.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+ * grouped by processing-time.
+ *
+ * @param gap the time interval of inactivity before a window is closed.
+ */
+class SessionWindow(gap: Expression) extends GroupWindow {
+
+ /**
+ * Session group-window.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+ * are grouped by processing-time.
+ *
+ * @param gap the time interval of inactivity before a window is closed.
+ */
+ def this(gap: String) = this(ExpressionParser.parseExpression(gap))
+
+ private var alias: Option[Expression] = None
+
+ /**
+ * Specifies the time attribute on which rows are grouped.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+ * are grouped by processing-time.
+ *
+ * For batch tables, refer to a timestamp or long attribute.
+ *
+ * @param timeField time mode for streaming tables and time attribute for batch tables
+ * @return a session group-window on event-time
+ */
+ def on(timeField: Expression): SessionEventTimeWindow =
+ new SessionEventTimeWindow(alias, timeField, gap)
+
+ /**
+ * Specifies the time attribute on which rows are grouped.
+ *
+ * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+ * are grouped by processing-time.
+ *
+ * For batch tables, refer to a timestamp or long attribute.
+ *
+ * @param timeField time mode for streaming tables and time attribute for batch tables
+ * @return a session group-window on event-time
+ */
+ def on(timeField: String): SessionEventTimeWindow =
+ on(ExpressionParser.parseExpression(timeField))
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: Expression): SessionWindow = {
+ this.alias = Some(alias)
+ this
+ }
+
+ /**
+ * Assigns an alias for this window that the following `select()` clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: String): SessionWindow = as(ExpressionParser.parseExpression(alias))
+
+ override private[flink] def toLogicalWindow: LogicalWindow =
+ ProcessingTimeSessionGroupWindow(alias, gap)
+}
+
+/**
+ * Session group-window on event-time.
+ */
+class SessionEventTimeWindow(
+ alias: Option[Expression],
+ timeField: Expression,
+ gap: Expression)
+ extends EventTimeWindow(timeField) {
+
+ override private[flink] def toLogicalWindow: LogicalWindow =
+ EventTimeSessionGroupWindow(name.orElse(alias), timeField, gap)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
new file mode 100644
index 0000000..f646caf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.tools.{RuleSet, RuleSets}
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConverters._
+
+/**
+ * Builder for creating a Calcite configuration.
+ */
+class CalciteConfigBuilder {
+ private var replaceRules: Boolean = false
+ private var ruleSets: List[RuleSet] = Nil
+
+ private var replaceOperatorTable: Boolean = false
+ private var operatorTables: List[SqlOperatorTable] = Nil
+
+ private var replaceSqlParserConfig: Option[SqlParser.Config] = None
+
+ /**
+ * Replaces the built-in rule set with the given rule set.
+ */
+ def replaceRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(replaceRuleSet)
+ ruleSets = List(replaceRuleSet)
+ replaceRules = true
+ this
+ }
+
+ /**
+ * Appends the given rule set to the built-in rule set.
+ */
+ def addRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(addedRuleSet)
+ ruleSets = addedRuleSet :: ruleSets
+ this
+ }
+
+ /**
+ * Replaces the built-in SQL operator table with the given table.
+ */
+ def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(replaceSqlOperatorTable)
+ operatorTables = List(replaceSqlOperatorTable)
+ replaceOperatorTable = true
+ this
+ }
+
+ /**
+ * Appends the given table to the built-in SQL operator table.
+ */
+ def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(addedSqlOperatorTable)
+ this.operatorTables = addedSqlOperatorTable :: this.operatorTables
+ this
+ }
+
+ /**
+ * Replaces the built-in SQL parser configuration with the given configuration.
+ */
+ def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(sqlParserConfig)
+ replaceSqlParserConfig = Some(sqlParserConfig)
+ this
+ }
+
+ private class CalciteConfigImpl(
+ val getRuleSet: Option[RuleSet],
+ val replacesRuleSet: Boolean,
+ val getSqlOperatorTable: Option[SqlOperatorTable],
+ val replacesSqlOperatorTable: Boolean,
+ val getSqlParserConfig: Option[SqlParser.Config])
+ extends CalciteConfig
+
+ /**
+ * Builds a new [[CalciteConfig]].
+ */
+ def build(): CalciteConfig = new CalciteConfigImpl(
+ ruleSets match {
+ case Nil => None
+ case h :: Nil => Some(h)
+ case _ =>
+ // concat rule sets
+ val concatRules = ruleSets.foldLeft(Nil: Iterable[RelOptRule])( (c, r) => r.asScala ++ c)
+ Some(RuleSets.ofList(concatRules.asJava))
+ },
+ this.replaceRules,
+ operatorTables match {
+ case Nil => None
+ case h :: Nil => Some(h)
+ case _ =>
+ // chain operator tables
+ Some(operatorTables.reduce( (x, y) => ChainedSqlOperatorTable.of(x, y)))
+ },
+ this.replaceOperatorTable,
+ replaceSqlParserConfig)
+}
+
+/**
+ * Calcite configuration for defining a custom Calcite configuration for Table and SQL API.
+ */
+trait CalciteConfig {
+ /**
+ * Returns whether this configuration replaces the built-in rule set.
+ */
+ def replacesRuleSet: Boolean
+
+ /**
+ * Returns a custom rule set.
+ */
+ def getRuleSet: Option[RuleSet]
+
+ /**
+ * Returns whether this configuration replaces the built-in SQL operator table.
+ */
+ def replacesSqlOperatorTable: Boolean
+
+ /**
+ * Returns a custom SQL operator table.
+ */
+ def getSqlOperatorTable: Option[SqlOperatorTable]
+
+ /**
+ * Returns a custom SQL parser configuration.
+ */
+ def getSqlParserConfig: Option[SqlParser.Config]
+}
+
+object CalciteConfig {
+
+ val DEFAULT = createBuilder().build()
+
+ /**
+ * Creates a new builder for constructing a [[CalciteConfig]].
+ */
+ def createBuilder(): CalciteConfigBuilder = {
+ new CalciteConfigBuilder
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
new file mode 100644
index 0000000..b4a3c42
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.validate.{SqlConformance, SqlValidatorImpl}
+import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
+
+/**
+ * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
+ */
+class FlinkCalciteSqlValidator(
+ opTab: SqlOperatorTable,
+ catalogReader: CalciteCatalogReader,
+ typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
+ opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {
+
+ override def getLogicalSourceRowType(
+ sourceRowType: RelDataType,
+ insert: SqlInsert): RelDataType = {
+ typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType)
+ }
+
+ override def getLogicalTargetRowType(
+ targetRowType: RelDataType,
+ insert: SqlInsert): RelDataType = {
+ typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
+ }
+}