You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:56 UTC

[27/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
new file mode 100644
index 0000000..0634f0b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.calcite.avatica.util.DateTimeUtils._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
+import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions._
+import java.math.{BigDecimal => JBigDecimal}
+
+import scala.language.implicitConversions
+
+/**
+ * These are all the operations that can be used to construct an [[Expression]] AST for expression
+ * operations.
+ *
+ * These operations must be kept in sync with the parser in
+ * [[org.apache.flink.table.expressions.ExpressionParser]].
+ */
+trait ImplicitExpressionOperations {
+  private[flink] def expr: Expression
+
+  /**
+    * Enables literals on left side of binary expressions.
+    *
+    * e.g. 12.toExpr % 'a
+    *
+    * @return expression
+    */
+  def toExpr: Expression = expr
+
+  def && (other: Expression) = And(expr, other)
+  def || (other: Expression) = Or(expr, other)
+
+  def > (other: Expression) = GreaterThan(expr, other)
+  def >= (other: Expression) = GreaterThanOrEqual(expr, other)
+  def < (other: Expression) = LessThan(expr, other)
+  def <= (other: Expression) = LessThanOrEqual(expr, other)
+
+  def === (other: Expression) = EqualTo(expr, other)
+  def !== (other: Expression) = NotEqualTo(expr, other)
+
+  def unary_! = Not(expr)
+  def unary_- = UnaryMinus(expr)
+
+  def isNull = IsNull(expr)
+  def isNotNull = IsNotNull(expr)
+
+  /**
+    * Returns true if given boolean expression is true. False otherwise (for null and false).
+    */
+  def isTrue = IsTrue(expr)
+
+  /**
+    * Returns true if given boolean expression is false. False otherwise (for null and true).
+    */
+  def isFalse = IsFalse(expr)
+
+  /**
+    * Returns true if given boolean expression is not true (for null and false). False otherwise.
+    */
+  def isNotTrue = IsNotTrue(expr)
+
+  /**
+    * Returns true if given boolean expression is not false (for null and true). False otherwise.
+    */
+  def isNotFalse = IsNotFalse(expr)
+
+  def + (other: Expression) = Plus(expr, other)
+  def - (other: Expression) = Minus(expr, other)
+  def / (other: Expression) = Div(expr, other)
+  def * (other: Expression) = Mul(expr, other)
+  def % (other: Expression) = mod(other)
+
+  def sum = Sum(expr)
+  def min = Min(expr)
+  def max = Max(expr)
+  def count = Count(expr)
+  def avg = Avg(expr)
+
+  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
+
+  /**
+    * Specifies a name for an expression i.e. a field.
+    *
+    * @param name name for one field
+    * @param extraNames additional names if the expression expands to multiple fields
+    * @return field with an alias
+    */
+  def as(name: Symbol, extraNames: Symbol*) = Alias(expr, name.name, extraNames.map(_.name))
+
+  def asc = Asc(expr)
+  def desc = Desc(expr)
+
+  /**
+    * Returns the start time of a window when applied on a window reference.
+    */
+  def start = WindowStart(expr)
+
+  /**
+    * Returns the end time of a window when applied on a window reference.
+    */
+  def end = WindowEnd(expr)
+
+  /**
+    * Ternary conditional operator that decides which of two other expressions should be evaluated
+    * based on a evaluated boolean condition.
+    *
+    * e.g. (42 > 5).?("A", "B") leads to "A"
+    *
+    * @param ifTrue expression to be evaluated if condition holds
+    * @param ifFalse expression to be evaluated if condition does not hold
+    */
+  def ?(ifTrue: Expression, ifFalse: Expression) = {
+    If(expr, ifTrue, ifFalse)
+  }
+
+  // scalar functions
+
+  /**
+    * Calculates the remainder of division the given number by another one.
+    */
+  def mod(other: Expression) = Mod(expr, other)
+
+  /**
+    * Calculates the Euler's number raised to the given power.
+    */
+  def exp() = Exp(expr)
+
+  /**
+    * Calculates the base 10 logarithm of given value.
+    */
+  def log10() = Log10(expr)
+
+  /**
+    * Calculates the natural logarithm of given value.
+    */
+  def ln() = Ln(expr)
+
+  /**
+    * Calculates the given number raised to the power of the other value.
+    */
+  def power(other: Expression) = Power(expr, other)
+
+  /**
+    * Calculates the square root of a given value.
+    */
+  def sqrt() = Sqrt(expr)
+
+  /**
+    * Calculates the absolute value of given value.
+    */
+  def abs() = Abs(expr)
+
+  /**
+    * Calculates the largest integer less than or equal to a given number.
+    */
+  def floor() = Floor(expr)
+
+  /**
+    * Calculates the smallest integer greater than or equal to a given number.
+    */
+  def ceil() = Ceil(expr)
+
+  // String operations
+
+  /**
+    * Creates a substring of the given string at given index for a given length.
+    *
+    * @param beginIndex first character of the substring (starting at 1, inclusive)
+    * @param length number of characters of the substring
+    * @return substring
+    */
+  def substring(beginIndex: Expression, length: Expression) =
+    Substring(expr, beginIndex, length)
+
+  /**
+    * Creates a substring of the given string beginning at the given index to the end.
+    *
+    * @param beginIndex first character of the substring (starting at 1, inclusive)
+    * @return substring
+    */
+  def substring(beginIndex: Expression) =
+    new Substring(expr, beginIndex)
+
+  /**
+    * Removes leading and/or trailing characters from the given string.
+    *
+    * @param removeLeading if true, remove leading characters (default: true)
+    * @param removeTrailing if true, remove trailing characters (default: true)
+    * @param character string containing the character (default: " ")
+    * @return trimmed string
+    */
+  def trim(
+      removeLeading: Boolean = true,
+      removeTrailing: Boolean = true,
+      character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = {
+    if (removeLeading && removeTrailing) {
+      Trim(TrimMode.BOTH, character, expr)
+    } else if (removeLeading) {
+      Trim(TrimMode.LEADING, character, expr)
+    } else if (removeTrailing) {
+      Trim(TrimMode.TRAILING, character, expr)
+    } else {
+      expr
+    }
+  }
+
+  /**
+    * Returns the length of a string.
+    */
+  def charLength() = CharLength(expr)
+
+  /**
+    * Returns all of the characters in a string in upper case using the rules of
+    * the default locale.
+    */
+  def upperCase() = Upper(expr)
+
+  /**
+    * Returns all of the characters in a string in lower case using the rules of
+    * the default locale.
+    */
+  def lowerCase() = Lower(expr)
+
+  /**
+    * Converts the initial letter of each word in a string to uppercase.
+    * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
+    */
+  def initCap() = InitCap(expr)
+
+  /**
+    * Returns true, if a string matches the specified LIKE pattern.
+    *
+    * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
+    */
+  def like(pattern: Expression) = Like(expr, pattern)
+
+  /**
+    * Returns true, if a string matches the specified SQL regex pattern.
+    *
+    * e.g. "A+" matches all strings that consist of at least one A
+    */
+  def similar(pattern: Expression) = Similar(expr, pattern)
+
+  /**
+    * Returns the position of string in an other string starting at 1.
+    * Returns 0 if string could not be found.
+    *
+    * e.g. "a".position("bbbbba") leads to 6
+    */
+  def position(haystack: Expression) = Position(expr, haystack)
+
+  /**
+    * Replaces a substring of string with a string starting at a position (starting at 1).
+    *
+    * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
+    */
+  def overlay(newString: Expression, starting: Expression) = new Overlay(expr, newString, starting)
+
+  /**
+    * Replaces a substring of string with a string starting at a position (starting at 1).
+    * The length specifies how many characters should be removed.
+    *
+    * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
+    */
+  def overlay(newString: Expression, starting: Expression, length: Expression) =
+    Overlay(expr, newString, starting, length)
+
+  // Temporal operations
+
+  /**
+    * Parses a date string in the form "yy-mm-dd" to a SQL Date.
+    */
+  def toDate = Cast(expr, SqlTimeTypeInfo.DATE)
+
+  /**
+    * Parses a time string in the form "hh:mm:ss" to a SQL Time.
+    */
+  def toTime = Cast(expr, SqlTimeTypeInfo.TIME)
+
+  /**
+    * Parses a timestamp string in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp.
+    */
+  def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
+
+  /**
+    * Extracts parts of a time point or time interval. Returns the part as a long value.
+    *
+    * e.g. "2006-06-05".toDate.extract(DAY) leads to 5
+    */
+  def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
+
+  /**
+    * Returns the quarter of a year from a SQL date.
+    *
+    * e.g. "1994-09-27".toDate.quarter() leads to 3
+    */
+  def quarter() = Quarter(expr)
+
+  /**
+    * Rounds down a time point to the given unit.
+    *
+    * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
+    */
+  def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr)
+
+  /**
+    * Rounds up a time point to the given unit.
+    *
+    * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
+    */
+  def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr)
+
+  // Interval types
+
+  /**
+    * Creates an interval of the given number of years.
+    *
+    * @return interval of months
+    */
+  def year = toMonthInterval(expr, 12)
+
+  /**
+    * Creates an interval of the given number of years.
+    *
+    * @return interval of months
+    */
+  def years = year
+
+  /**
+    * Creates an interval of the given number of months.
+    *
+    * @return interval of months
+    */
+  def month = toMonthInterval(expr, 1)
+
+  /**
+    * Creates an interval of the given number of months.
+    *
+    * @return interval of months
+    */
+  def months = month
+
+  /**
+    * Creates an interval of the given number of days.
+    *
+    * @return interval of milliseconds
+    */
+  def day = toMilliInterval(expr, MILLIS_PER_DAY)
+
+  /**
+    * Creates an interval of the given number of days.
+    *
+    * @return interval of milliseconds
+    */
+  def days = day
+
+  /**
+    * Creates an interval of the given number of hours.
+    *
+    * @return interval of milliseconds
+    */
+  def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
+
+  /**
+    * Creates an interval of the given number of hours.
+    *
+    * @return interval of milliseconds
+    */
+  def hours = hour
+
+  /**
+    * Creates an interval of the given number of minutes.
+    *
+    * @return interval of milliseconds
+    */
+  def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
+
+  /**
+    * Creates an interval of the given number of minutes.
+    *
+    * @return interval of milliseconds
+    */
+  def minutes = minute
+
+  /**
+    * Creates an interval of the given number of seconds.
+    *
+    * @return interval of milliseconds
+    */
+  def second = toMilliInterval(expr, MILLIS_PER_SECOND)
+
+  /**
+    * Creates an interval of the given number of seconds.
+    *
+    * @return interval of milliseconds
+    */
+  def seconds = second
+
+  /**
+    * Creates an interval of the given number of milliseconds.
+    *
+    * @return interval of milliseconds
+    */
+  def milli = toMilliInterval(expr, 1)
+
+  /**
+    * Creates an interval of the given number of milliseconds.
+    *
+    * @return interval of milliseconds
+    */
+  def millis = milli
+
+  // row interval type
+
+  /**
+    * Creates an interval of rows.
+    *
+    * @return interval of rows
+    */
+  def rows = toRowInterval(expr)
+
+  /**
+    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
+    * returns it's value.
+    *
+    * @param name name of the field (similar to Flink's field expressions)
+    * @return value of the field
+    */
+  def get(name: String) = GetCompositeField(expr, name)
+
+  /**
+    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
+    * returns it's value.
+    *
+    * @param index position of the field
+    * @return value of the field
+    */
+  def get(index: Int) = GetCompositeField(expr, index)
+
+  /**
+    * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
+    * into a flat representation where every subtype is a separate field.
+    */
+  def flatten() = Flattening(expr)
+
+  /**
+    * Accesses the element of an array based on an index (starting at 1).
+    *
+    * @param index position of the element (starting at 1)
+    * @return value of the element
+    */
+  def at(index: Expression) = ArrayElementAt(expr, index)
+
+  /**
+    * Returns the number of elements of an array.
+    *
+    * @return number of elements
+    */
+  def cardinality() = ArrayCardinality(expr)
+
+  /**
+    * Returns the sole element of an array with a single element. Returns null if the array is
+    * empty. Throws an exception if the array has more than one element.
+    *
+    * @return the first and only element of an array with a single element
+    */
+  def element() = ArrayElement(expr)
+}
+
+/**
+ * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]]
+ * to [[ImplicitExpressionOperations]].
+ */
+trait ImplicitExpressionConversions {
+  implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
+    def expr = e
+  }
+
+  implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations {
+    def expr = UnresolvedFieldReference(s.name)
+  }
+
+  implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations {
+    def expr = Literal(l)
+  }
+
+  implicit class LiteralByteExpression(b: Byte) extends ImplicitExpressionOperations {
+    def expr = Literal(b)
+  }
+
+  implicit class LiteralShortExpression(s: Short) extends ImplicitExpressionOperations {
+    def expr = Literal(s)
+  }
+
+  implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations {
+    def expr = Literal(i)
+  }
+
+  implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations {
+    def expr = Literal(f)
+  }
+
+  implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations {
+    def expr = Literal(d)
+  }
+
+  implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations {
+    def expr = Literal(str)
+  }
+
+  implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations {
+    def expr = Literal(bool)
+  }
+
+  implicit class LiteralJavaDecimalExpression(javaDecimal: java.math.BigDecimal)
+      extends ImplicitExpressionOperations {
+    def expr = Literal(javaDecimal)
+  }
+
+  implicit class LiteralScalaDecimalExpression(scalaDecimal: scala.math.BigDecimal)
+      extends ImplicitExpressionOperations {
+    def expr = Literal(scalaDecimal.bigDecimal)
+  }
+
+  implicit class LiteralSqlDateExpression(sqlDate: Date) extends ImplicitExpressionOperations {
+    def expr = Literal(sqlDate)
+  }
+
+  implicit class LiteralSqlTimeExpression(sqlTime: Time) extends ImplicitExpressionOperations {
+    def expr = Literal(sqlTime)
+  }
+
+  implicit class LiteralSqlTimestampExpression(sqlTimestamp: Timestamp)
+      extends ImplicitExpressionOperations {
+    def expr = Literal(sqlTimestamp)
+  }
+
+  implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name)
+  implicit def byte2Literal(b: Byte): Expression = Literal(b)
+  implicit def short2Literal(s: Short): Expression = Literal(s)
+  implicit def int2Literal(i: Int): Expression = Literal(i)
+  implicit def long2Literal(l: Long): Expression = Literal(l)
+  implicit def double2Literal(d: Double): Expression = Literal(d)
+  implicit def float2Literal(d: Float): Expression = Literal(d)
+  implicit def string2Literal(str: String): Expression = Literal(str)
+  implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
+  implicit def javaDec2Literal(javaDec: JBigDecimal): Expression = Literal(javaDec)
+  implicit def scalaDec2Literal(scalaDec: BigDecimal): Expression =
+    Literal(scalaDec.bigDecimal)
+  implicit def sqlDate2Literal(sqlDate: Date): Expression = Literal(sqlDate)
+  implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
+  implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression =
+    Literal(sqlTimestamp)
+  implicit def array2ArrayConstructor(array: Array[_]): Expression = convertArray(array)
+}
+
+// ------------------------------------------------------------------------------------------------
+// Expressions with no parameters
+// ------------------------------------------------------------------------------------------------
+
+// we disable the object checker here as it checks for capital letters of objects
+// but we want that objects look like functions in certain cases e.g. array(1, 2, 3)
+// scalastyle:off object.name
+
+/**
+  * Returns the current SQL date in UTC time zone.
+  */
+object currentDate {
+
+  /**
+    * Returns the current SQL date in UTC time zone.
+    */
+  def apply(): Expression = {
+    CurrentDate()
+  }
+}
+
+/**
+  * Returns the current SQL time in UTC time zone.
+  */
+object currentTime {
+
+  /**
+    * Returns the current SQL time in UTC time zone.
+    */
+  def apply(): Expression = {
+    CurrentTime()
+  }
+}
+
+/**
+  * Returns the current SQL timestamp in UTC time zone.
+  */
+object currentTimestamp {
+
+  /**
+    * Returns the current SQL timestamp in UTC time zone.
+    */
+  def apply(): Expression = {
+    CurrentTimestamp()
+  }
+}
+
+/**
+  * Returns the current SQL time in local time zone.
+  */
+object localTime {
+
+  /**
+    * Returns the current SQL time in local time zone.
+    */
+  def apply(): Expression = {
+    LocalTime()
+  }
+}
+
+/**
+  * Returns the current SQL timestamp in local time zone.
+  */
+object localTimestamp {
+
+  /**
+    * Returns the current SQL timestamp in local time zone.
+    */
+  def apply(): Expression = {
+    LocalTimestamp()
+  }
+}
+
+/**
+  * Determines whether two anchored time intervals overlap. Time point and temporal are
+  * transformed into a range defined by two time points (start, end). The function
+  * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
+  *
+  * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+  *
+  * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
+  */
+object temporalOverlaps {
+
+  /**
+    * Determines whether two anchored time intervals overlap. Time point and temporal are
+    * transformed into a range defined by two time points (start, end).
+    *
+    * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+    *
+    * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
+    */
+  def apply(
+      leftTimePoint: Expression,
+      leftTemporal: Expression,
+      rightTimePoint: Expression,
+      rightTemporal: Expression): Expression = {
+    TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
+  }
+}
+
+/**
+  * Creates an array of literals. The array will be an array of objects (not primitives).
+  */
+object array {
+
+  /**
+    * Creates an array of literals. The array will be an array of objects (not primitives).
+    */
+  def apply(head: Expression, tail: Expression*): Expression = {
+    ArrayConstructor(head +: tail.toSeq)
+  }
+}
+
+// scalastyle:on object.name

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
new file mode 100644
index 0000000..0e4c1c7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.api.{SessionWindow, SlideWithSize, TumblingWindow}
+
+/**
+  * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
+  * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+  * elements in 5 minutes intervals.
+  */
+object Tumble {
+
+  /**
+    * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
+    * windows. For example, a tumbling window of 5 minutes size groups
+    * elements in 5 minutes intervals.
+    *
+    * @param size the size of the window as time or row-count interval.
+    * @return a tumbling window
+    */
+  def over(size: Expression): TumblingWindow = new TumblingWindow(size)
+}
+
+/**
+  * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by
+  * a specified slide interval. If the slide interval is smaller than the window size, sliding
+  * windows are overlapping. Thus, an element can be assigned to multiple windows.
+  *
+  * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
+  * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
+  * window evaluations.
+  */
+object Slide {
+
+  /**
+    * Creates a sliding window. Sliding windows have a fixed size and slide by
+    * a specified slide interval. If the slide interval is smaller than the window size, sliding
+    * windows are overlapping. Thus, an element can be assigned to multiple windows.
+    *
+    * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
+    * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
+    * consecutive
+    *
+    * @param size the size of the window as time or row-count interval
+    * @return a partially specified sliding window
+    */
+  def over(size: Expression): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+  * Helper object for creating a session window. The boundary of session windows are defined by
+  * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+  * gap period.
+  */
+object Session {
+
+  /**
+    * Creates a session window. The boundary of session windows are defined by
+    * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+    * gap period.
+    *
+    * @param gap specifies how long (as interval of milliseconds) to wait for new data before
+    *            closing the session window.
+    * @return a session window
+    */
+  def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
new file mode 100644
index 0000000..cd341cb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.types.Row
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
+import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.reflect.ClassTag
+
+/**
+  * == Table API (Scala) ==
+  *
+  * Importing this package with:
+  *
+  * {{{
+  *   import org.apache.flink.table.api.scala._
+  * }}}
+  *
+  * imports implicit conversions for converting a [[DataSet]] and a [[DataStream]] to a
+  * [[Table]]. This can be used to perform SQL-like queries on data. Please have
+  * a look at [[Table]] to see which operations are supported and
+  * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]] to see how an
+  * expression can be specified.
+  *
+  * When writing a query you can use Scala Symbols to refer to field names. One would
+  * refer to field `a` by writing `'a`. Sometimes it is necessary to manually convert a
+  * Scala literal to an Expression literal, in those cases use `Literal`, as in `Literal(3)`.
+  *
+  * Example:
+  *
+  * {{{
+  *   import org.apache.flink.api.scala._
+  *   import org.apache.flink.table.api.scala._
+  *
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val input: DataSet[(String, Int)] = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
+  *   val result = input
+  *         .toTable(tEnv, 'word, 'count)
+  *         .groupBy('word)
+  *         .select('word, 'count.avg)
+  *
+  *   result.print()
+  * }}}
+  *
+  */
+package object scala extends ImplicitExpressionConversions {
+
+  implicit def table2TableConversions(table: Table): TableConversions = {
+    new TableConversions(table)
+  }
+
+  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = {
+    new DataSetConversions[T](set, set.getType())
+  }
+
+  implicit def table2RowDataSet(table: Table): DataSet[Row] = {
+    val tableEnv = table.tableEnv.asInstanceOf[ScalaBatchTableEnv]
+    tableEnv.toDataSet[Row](table)
+  }
+
+  implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
+    new DataStreamConversions[T](set, set.dataType.asInstanceOf[CompositeType[T]])
+  }
+
+  implicit def table2RowDataStream(table: Table): DataStream[Row] = {
+    val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv]
+    tableEnv.toDataStream[Row](table)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
new file mode 100644
index 0000000..6322026
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.logical.Minus
+import org.apache.flink.table.expressions.{Alias, Asc, Call, Expression, ExpressionParser, Ordering, TableFunctionCall, UnresolvedAlias}
+import org.apache.flink.table.plan.ProjectionTranslator._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.sinks.TableSink
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+  * A Table is the core component of the Table API.
+  * Similar to how the batch and streaming APIs have DataSet and DataStream,
+  * the Table API is built around [[Table]].
+  *
+  * Use the methods of [[Table]] to transform data. Use [[TableEnvironment]] to convert a [[Table]]
+  * back to a DataSet or DataStream.
+  *
+  * When using Scala a [[Table]] can also be converted using implicit conversions.
+  *
+  * Example:
+  *
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val set: DataSet[(String, Int)] = ...
+  *   val table = set.toTable(tEnv, 'a, 'b)
+  *   ...
+  *   val table2 = ...
+  *   val set2: DataSet[MyType] = table2.toDataSet[MyType]
+  * }}}
+  *
+  * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments
+  * in a Scala DSL or as an expression String. Please refer to the documentation for the expression
+  * syntax.
+  *
+  * @param tableEnv The [[TableEnvironment]] to which the table is bound.
+  * @param logicalPlan logical representation
+  */
+class Table(
+    private[flink] val tableEnv: TableEnvironment,
+    private[flink] val logicalPlan: LogicalNode) {
+
+  def relBuilder = tableEnv.getRelBuilder
+
+  def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
+
+  /**
+    * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
+    * can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.select('key, 'value.avg + " The average" as 'average)
+    * }}}
+    */
+  def select(fields: Expression*): Table = {
+    val expandedFields = expandProjectList(fields, logicalPlan, tableEnv)
+    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableEnv)
+    if (propNames.nonEmpty) {
+      throw ValidationException("Window properties can only be used on windowed tables.")
+    }
+
+    if (aggNames.nonEmpty) {
+      val projectsOnAgg = replaceAggregationsAndProperties(
+        expandedFields, tableEnv, aggNames, propNames)
+      val projectFields = extractFieldReferences(expandedFields)
+
+      new Table(tableEnv,
+        Project(projectsOnAgg,
+          Aggregate(Nil, aggNames.map(a => Alias(a._1, a._2)).toSeq,
+            Project(projectFields, logicalPlan).validate(tableEnv)
+          ).validate(tableEnv)
+        ).validate(tableEnv)
+      )
+    } else {
+      new Table(tableEnv,
+        Project(expandedFields.map(UnresolvedAlias), logicalPlan).validate(tableEnv))
+    }
+  }
+
+  /**
+    * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
+    * can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.select("key, value.avg + ' The average' as average")
+    * }}}
+    */
+  def select(fields: String): Table = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    select(fieldExprs: _*)
+  }
+
+  /**
+    * Renames the fields of the expression result. Use this to disambiguate fields before
+    * joining to operations.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.as('a, 'b)
+    * }}}
+    */
+  def as(fields: Expression*): Table = {
+    new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Renames the fields of the expression result. Use this to disambiguate fields before
+    * joining to operations.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.as("a, b")
+    * }}}
+    */
+  def as(fields: String): Table = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    as(fieldExprs: _*)
+  }
+
+  /**
+    * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+    * clause.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.filter('name === "Fred")
+    * }}}
+    */
+  def filter(predicate: Expression): Table = {
+    new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+    * clause.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.filter("name = 'Fred'")
+    * }}}
+    */
+  def filter(predicate: String): Table = {
+    val predicateExpr = ExpressionParser.parseExpression(predicate)
+    filter(predicateExpr)
+  }
+
+  /**
+    * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+    * clause.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.where('name === "Fred")
+    * }}}
+    */
+  def where(predicate: Expression): Table = {
+    filter(predicate)
+  }
+
+  /**
+    * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
+    * clause.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.where("name = 'Fred'")
+    * }}}
+    */
+  def where(predicate: String): Table = {
+    filter(predicate)
+  }
+
+  /**
+    * Groups the elements on some grouping keys. Use this before a selection with aggregations
+    * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.groupBy('key).select('key, 'value.avg)
+    * }}}
+    */
+  def groupBy(fields: Expression*): GroupedTable = {
+    new GroupedTable(this, fields)
+  }
+
+  /**
+    * Groups the elements on some grouping keys. Use this before a selection with aggregations
+    * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.groupBy("key").select("key, value.avg")
+    * }}}
+    */
+  def groupBy(fields: String): GroupedTable = {
+    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
+    groupBy(fieldsExpr: _*)
+  }
+
+  /**
+    * Removes duplicate values and returns only distinct (different) values.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.select("key, value").distinct()
+    * }}}
+    */
+  def distinct(): Table = {
+    new Table(tableEnv, Distinct(logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary. You can use
+    * where and select clauses after a join to further specify the behaviour of the join.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
+    * }}}
+    */
+  def join(right: Table): Table = {
+    join(right, None, JoinType.INNER)
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.join(right, "a = b")
+    * }}}
+    */
+  def join(right: Table, joinPredicate: String): Table = {
+    join(right, joinPredicate, JoinType.INNER)
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.join(right, 'a === 'b).select('a, 'b, 'd)
+    * }}}
+    */
+  def join(right: Table, joinPredicate: Expression): Table = {
+    join(right, Some(joinPredicate), JoinType.INNER)
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+    * have nullCheck enabled.
+    *
+    * Example:
+    *
+    * {{{
+    *   left.leftOuterJoin(right, "a = b").select('a, 'b, 'd)
+    * }}}
+    */
+  def leftOuterJoin(right: Table, joinPredicate: String): Table = {
+    join(right, joinPredicate, JoinType.LEFT_OUTER)
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+    * have nullCheck enabled.
+    *
+    * Example:
+    *
+    * {{{
+    *   left.leftOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+    * }}}
+    */
+  def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
+    join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+    * have nullCheck enabled.
+    *
+    * Example:
+    *
+    * {{{
+    *   left.rightOuterJoin(right, "a = b").select('a, 'b, 'd)
+    * }}}
+    */
+  def rightOuterJoin(right: Table, joinPredicate: String): Table = {
+    join(right, joinPredicate, JoinType.RIGHT_OUTER)
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+    * have nullCheck enabled.
+    *
+    * Example:
+    *
+    * {{{
+    *   left.rightOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+    * }}}
+    */
+  def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
+    join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+    * have nullCheck enabled.
+    *
+    * Example:
+    *
+    * {{{
+    *   left.fullOuterJoin(right, "a = b").select('a, 'b, 'd)
+    * }}}
+    */
+  def fullOuterJoin(right: Table, joinPredicate: String): Table = {
+    join(right, joinPredicate, JoinType.FULL_OUTER)
+  }
+
+  /**
+    * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
+    * operations must not overlap, use [[as]] to rename fields if necessary.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
+    * have nullCheck enabled.
+    *
+    * Example:
+    *
+    * {{{
+    *   left.fullOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+    * }}}
+    */
+  def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
+    join(right, Some(joinPredicate), JoinType.FULL_OUTER)
+  }
+
+  private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
+    val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
+    join(right, Some(joinPredicateExpr), joinType)
+  }
+
+  private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
+    }
+    new Table(
+      tableEnv,
+      Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
+        .validate(tableEnv))
+  }
+
+  /**
+    * Minus of two [[Table]]s with duplicate records removed.
+    * Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not
+    * exist in the right table. Duplicate records in the left table are returned
+    * exactly once, i.e., duplicates are removed. Both tables must have identical field types.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.minus(right)
+    * }}}
+    */
+  def minus(right: Table): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same TableEnvironment can be " +
+        "subtracted.")
+    }
+    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
+      .validate(tableEnv))
+  }
+
+  /**
+    * Minus of two [[Table]]s. Similar to an SQL EXCEPT ALL.
+    * Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in
+    * the right table. A record that is present n times in the left table and m times
+    * in the right table is returned (n - m) times, i.e., as many duplicates as are present
+    * in the right table are removed. Both tables must have identical field types.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.minusAll(right)
+    * }}}
+    */
+  def minusAll(right: Table): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same TableEnvironment can be " +
+        "subtracted.")
+    }
+    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
+      .validate(tableEnv))
+  }
+
+  /**
+    * Unions two [[Table]]s with duplicate records removed.
+    * Similar to an SQL UNION. The fields of the two union operations must fully overlap.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.union(right)
+    * }}}
+    */
+  def union(right: Table): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
+    }
+    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
+  }
+
+  /**
+    * Unions two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
+    * must fully overlap.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.unionAll(right)
+    * }}}
+    */
+  def unionAll(right: Table): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
+    }
+    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
+  }
+
+  /**
+    * Intersects two [[Table]]s with duplicate records removed. Intersect returns records that
+    * exist in both tables. If a record is present in one or both tables more than once, it is
+    * returned just once, i.e., the resulting table has no duplicate records. Similar to an
+    * SQL INTERSECT. The fields of the two intersect operations must fully overlap.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.intersect(right)
+    * }}}
+    */
+  def intersect(right: Table): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException(
+        "Only tables from the same TableEnvironment can be intersected.")
+    }
+    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
+  }
+
+  /**
+    * Intersects two [[Table]]s. IntersectAll returns records that exist in both tables.
+    * If a record is present in both tables more than once, it is returned as many times as it
+    * is present in both tables, i.e., the resulting table might have duplicate records. Similar
+    * to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.intersectAll(right)
+    * }}}
+    */
+  def intersectAll(right: Table): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException(
+        "Only tables from the same TableEnvironment can be intersected.")
+    }
+    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
+  }
+
+  /**
+    * Sorts the given [[Table]]. Similar to SQL ORDER BY.
+    * The resulting Table is globally sorted across all parallel partitions.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.orderBy('name.desc)
+    * }}}
+    */
+  def orderBy(fields: Expression*): Table = {
+    val order: Seq[Ordering] = fields.map {
+      case o: Ordering => o
+      case e => Asc(e)
+    }
+    new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Sorts the given [[Table]]. Similar to SQL ORDER BY.
+    * The resulting Table is sorted globally sorted across all parallel partitions.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.orderBy("name.desc")
+    * }}}
+    */
+  def orderBy(fields: String): Table = {
+    val parsedFields = ExpressionParser.parseExpressionList(fields)
+    orderBy(parsedFields: _*)
+  }
+
+  /**
+    * Limits a sorted result from an offset position.
+    * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
+    * thus must be preceded by it.
+    *
+    * Example:
+    *
+    * {{{
+    *   // returns unlimited number of records beginning with the 4th record
+    *   tab.orderBy('name.desc).limit(3)
+    * }}}
+    *
+    * @param offset number of records to skip
+    */
+  def limit(offset: Int): Table = {
+    new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Limits a sorted result to a specified number of records from an offset position.
+    * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
+    * thus must be preceded by it.
+    *
+    * Example:
+    *
+    * {{{
+    *   // returns 5 records beginning with the 4th record
+    *   tab.orderBy('name.desc).limit(3, 5)
+    * }}}
+    *
+    * @param offset number of records to skip
+    * @param fetch number of records to be returned
+    */
+  def limit(offset: Int, fetch: Int): Table = {
+    new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+    * to an SQL cross join, but it works with a table function. It returns rows from the outer
+    * table (table on the left of the operator) that produces matching values from the table
+    * function (which is defined in the expression on the right side of the operator).
+    *
+    * Example:
+    *
+    * {{{
+    *   class MySplitUDTF extends TableFunction[String] {
+    *     def eval(str: String): Unit = {
+    *       str.split("#").foreach(collect)
+    *     }
+    *   }
+    *
+    *   val split = new MySplitUDTF()
+    *   table.join(split('c) as ('s)).select('a,'b,'c,'s)
+    * }}}
+    */
+  def join(udtf: Expression): Table = {
+    joinUdtfInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+    * to an SQL cross join, but it works with a table function. It returns rows from the outer
+    * table (table on the left of the operator) that produces matching values from the table
+    * function (which is defined in the expression on the right side of the operator).
+    *
+    * Example:
+    *
+    * {{{
+    *   class MySplitUDTF extends TableFunction<String> {
+    *     public void eval(String str) {
+    *       str.split("#").forEach(this::collect);
+    *     }
+    *   }
+    *
+    *   TableFunction<String> split = new MySplitUDTF();
+    *   tableEnv.registerFunction("split", split);
+    *
+    *   table.join("split(c) as (s)").select("a, b, c, s");
+    * }}}
+    */
+  def join(udtf: String): Table = {
+    joinUdtfInternal(udtf, JoinType.INNER)
+  }
+
+  /**
+    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
+    * the rows from the outer table (table on the left of the operator), and rows that do not match
+    * the condition from the table function (which is defined in the expression on the right
+    * side of the operator). Rows with no matching condition are filled with null values.
+    *
+    * Example:
+    *
+    * {{{
+    *   class MySplitUDTF extends TableFunction[String] {
+    *     def eval(str: String): Unit = {
+    *       str.split("#").foreach(collect)
+    *     }
+    *   }
+    *
+    *   val split = new MySplitUDTF()
+    *   table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
+    * }}}
+    */
+  def leftOuterJoin(udtf: Expression): Table = {
+    joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
+  }
+
+  /**
+    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
+    * the rows from the outer table (table on the left of the operator), and rows that do not match
+    * the condition from the table function (which is defined in the expression on the right
+    * side of the operator). Rows with no matching condition are filled with null values.
+    *
+    * Example:
+    *
+    * {{{
+    *   class MySplitUDTF extends TableFunction<String> {
+    *     public void eval(String str) {
+    *       str.split("#").forEach(this::collect);
+    *     }
+    *   }
+    *
+    *   TableFunction<String> split = new MySplitUDTF();
+    *   tableEnv.registerFunction("split", split);
+    *
+    *   table.leftOuterJoin("split(c) as (s)").select("a, b, c, s");
+    * }}}
+    */
+  def leftOuterJoin(udtf: String): Table = {
+    joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
+  }
+
+  private def joinUdtfInternal(udtfString: String, joinType: JoinType): Table = {
+    val udtf = ExpressionParser.parseExpression(udtfString)
+    joinUdtfInternal(udtf, joinType)
+  }
+
+  private def joinUdtfInternal(udtf: Expression, joinType: JoinType): Table = {
+    var alias: Option[Seq[String]] = None
+
+    // unwrap an Expression until we get a TableFunctionCall
+    def unwrap(expr: Expression): TableFunctionCall = expr match {
+      case Alias(child, name, extraNames) =>
+        alias = Some(Seq(name) ++ extraNames)
+        unwrap(child)
+      case Call(name, args) =>
+        val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
+        unwrap(function)
+      case c: TableFunctionCall => c
+      case _ =>
+        throw new TableException(
+          "Cross/Outer Apply operators only accept expressions that define table functions.")
+    }
+
+    val call = unwrap(udtf)
+      .as(alias)
+      .toLogicalTableFunctionCall(this.logicalPlan)
+      .validate(tableEnv)
+
+    new Table(
+      tableEnv,
+      Join(this.logicalPlan, call, joinType, None, correlated = true).validate(tableEnv))
+  }
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
+    *
+    * A batch [[Table]] can only be written to a
+    * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+    * [[org.apache.flink.table.sinks.StreamTableSink]].
+    *
+    * @param sink The [[TableSink]] to which the [[Table]] is written.
+    * @tparam T The data type that the [[TableSink]] expects.
+    */
+  def writeToSink[T](sink: TableSink[T]): Unit = {
+
+    // get schema information of table
+    val rowType = getRelNode.getRowType
+    val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
+    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
+      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+
+    // configure the table sink
+    val configuredSink = sink.configure(fieldNames, fieldTypes)
+
+    // emit the table to the configured table sink
+    tableEnv.writeToSink(this, configuredSink)
+  }
+
+  /**
+    * Groups the records of a table by assigning them to windows defined by a time or row interval.
+    *
+    * For streaming tables of infinite size, grouping into windows is required to define finite
+    * groups on which group-based aggregates can be computed.
+    *
+    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
+    * groupBy.
+    *
+    * __Note__: window on non-grouped streaming table is a non-parallel operation, i.e., all data
+    * will be processed by a single operator.
+    *
+    * @param groupWindow group-window that specifies how elements are grouped.
+    * @return A windowed table.
+    */
+  def window(groupWindow: GroupWindow): GroupWindowedTable = {
+    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+      throw new ValidationException(s"Windows on batch tables are currently not supported.")
+    }
+    new GroupWindowedTable(this, Seq(), groupWindow)
+  }
+}
+
+/**
+  * A table that has been grouped on a set of grouping keys.
+  */
+class GroupedTable(
+  private[flink] val table: Table,
+  private[flink] val groupKey: Seq[Expression]) {
+
+  /**
+    * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
+    * The field expressions can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.groupBy('key).select('key, 'value.avg + " The average" as 'average)
+    * }}}
+    */
+  def select(fields: Expression*): Table = {
+    val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
+    if (propNames.nonEmpty) {
+      throw ValidationException("Window properties can only be used on windowed tables.")
+    }
+
+    val projectsOnAgg = replaceAggregationsAndProperties(
+      fields, table.tableEnv, aggNames, propNames)
+    val projectFields = extractFieldReferences(fields ++ groupKey)
+
+    new Table(table.tableEnv,
+      Project(projectsOnAgg,
+        Aggregate(groupKey, aggNames.map(a => Alias(a._1, a._2)).toSeq,
+          Project(projectFields, table.logicalPlan).validate(table.tableEnv)
+        ).validate(table.tableEnv)
+      ).validate(table.tableEnv))
+  }
+
+  /**
+    * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
+    * The field expressions can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   tab.groupBy("key").select("key, value.avg + ' The average' as average")
+    * }}}
+    */
+  def select(fields: String): Table = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    select(fieldExprs: _*)
+  }
+
+  /**
+    * Groups the records of a table by assigning them to windows defined by a time or row interval.
+    *
+    * For streaming tables of infinite size, grouping into windows is required to define finite
+    * groups on which group-based aggregates can be computed.
+    *
+    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
+    * groupBy.
+    *
+    * @param groupWindow group-window that specifies how elements are grouped.
+    * @return A windowed table.
+    */
+  def window(groupWindow: GroupWindow): GroupWindowedTable = {
+    if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
+      throw new ValidationException(s"Windows on batch tables are currently not supported.")
+    }
+    new GroupWindowedTable(table, groupKey, groupWindow)
+  }
+}
+
+class GroupWindowedTable(
+    private[flink] val table: Table,
+    private[flink] val groupKey: Seq[Expression],
+    private[flink] val window: GroupWindow) {
+
+  /**
+    * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement.
+    * The field expressions can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average)
+    * }}}
+    */
+  def select(fields: Expression*): Table = {
+    val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
+    val projectsOnAgg = replaceAggregationsAndProperties(
+      fields, table.tableEnv, aggNames, propNames)
+
+    val projectFields = (table.tableEnv, window) match {
+      // event time can be arbitrary field in batch environment
+      case (_: BatchTableEnvironment, w: EventTimeWindow) =>
+        extractFieldReferences(fields ++ groupKey ++ Seq(w.timeField))
+      case (_, _) =>
+        extractFieldReferences(fields ++ groupKey)
+    }
+
+    new Table(table.tableEnv,
+      Project(
+        projectsOnAgg,
+        WindowAggregate(
+          groupKey,
+          window.toLogicalWindow,
+          propNames.map(a => Alias(a._1, a._2)).toSeq,
+          aggNames.map(a => Alias(a._1, a._2)).toSeq,
+          Project(projectFields, table.logicalPlan).validate(table.tableEnv)
+        ).validate(table.tableEnv)
+      ).validate(table.tableEnv))
+  }
+
+  /**
+    * Performs a selection operation on a group-windows table. Similar to an SQL SELECT statement.
+    * The field expressions can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   groupWindowTable.select("key, window.start, value.avg + ' The average' as average")
+    * }}}
+    */
+  def select(fields: String): Table = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    select(fieldExprs: _*)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
new file mode 100644
index 0000000..7e4498d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.plan.logical._
+
+/**
+  * A group-window specification.
+  *
+  * Group-windows group rows based on time or row-count intervals and is therefore essentially a
+  * special type of groupBy. Just like groupBy, group-windows allow to compute aggregates
+  * on groups of elements.
+  *
+  * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping
+  * is required to apply aggregations on streaming tables.
+  *
+  * For finite batch tables, group-windows provide shortcuts for time-based groupBy.
+  *
+  */
+trait GroupWindow {
+
+  /**
+    * Converts an API class to a logical window for planning.
+    */
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables. For batch table it defines the
+  *                  time attribute on which is grouped.
+  */
+abstract class EventTimeWindow(val timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: Expression): EventTimeWindow = {
+    this.name = Some(alias)
+    this
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias))
+}
+
+// ------------------------------------------------------------------------------------------------
+// Tumbling group-windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+  * Tumbling group-window.
+  *
+  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+  * grouped by processing-time.
+  *
+  * @param size the size of the window either as time or row-count interval.
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+    * Tumbling group-window.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * @param size the size of the window either as time or row-count interval.
+    */
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a tumbling group-window on event-time
+    */
+  def on(timeField: Expression): TumblingEventTimeWindow =
+    new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a tumbling group-window on event-time
+    */
+  def on(timeField: String): TumblingEventTimeWindow =
+    on(ExpressionParser.parseExpression(timeField))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: Expression): TumblingWindow = {
+    this.alias = Some(alias)
+    this
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias))
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    ProcessingTimeTumblingGroupWindow(alias, size)
+}
+
+/**
+  * Tumbling group-window on event-time.
+  */
+class TumblingEventTimeWindow(
+    alias: Option[Expression],
+    time: Expression,
+    size: Expression)
+  extends EventTimeWindow(time) {
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    EventTimeTumblingGroupWindow(name.orElse(alias), time, size)
+}
+
+// ------------------------------------------------------------------------------------------------
+// Sliding group windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+  * Partially specified sliding window.
+  *
+  * @param size the size of the window either as time or row-count interval.
+  */
+class SlideWithSize(size: Expression) {
+
+  /**
+    * Partially specified sliding window.
+    *
+    * @param size the size of the window either as time or row-count interval.
+    */
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  /**
+    * Specifies the window's slide as time or row-count interval.
+    *
+    * The slide determines the interval in which windows are started. Hence, sliding windows can
+    * overlap if the slide is smaller than the size of the window.
+    *
+    * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
+    * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
+    * windows.
+    *
+    * @param slide the slide of the window either as time or row-count interval.
+    * @return a sliding group-window
+    */
+  def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide)
+
+  /**
+    * Specifies the window's slide as time or row-count interval.
+    *
+    * The slide determines the interval in which windows are started. Hence, sliding windows can
+    * overlap if the slide is smaller than the size of the window.
+    *
+    * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
+    * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
+    * windows.
+    *
+    * @param slide the slide of the window either as time or row-count interval.
+    * @return a sliding group-window
+    */
+  def every(slide: String): SlidingWindow = every(ExpressionParser.parseExpression(slide))
+}
+
+/**
+  * Sliding group-window.
+  *
+  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+  * grouped by processing-time.
+  *
+  * @param size the size of the window either as time or row-count interval.
+  */
+class SlidingWindow(
+    size: Expression,
+    slide: Expression)
+  extends GroupWindow {
+
+  private var alias: Option[Expression] = None
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a sliding group-window on event-time
+    */
+  def on(timeField: Expression): SlidingEventTimeWindow =
+    new SlidingEventTimeWindow(alias, timeField, size, slide)
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a sliding group-window on event-time
+    */
+  def on(timeField: String): SlidingEventTimeWindow =
+    on(ExpressionParser.parseExpression(timeField))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: Expression): SlidingWindow = {
+    this.alias = Some(alias)
+    this
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): SlidingWindow = as(ExpressionParser.parseExpression(alias))
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    ProcessingTimeSlidingGroupWindow(alias, size, slide)
+}
+
+/**
+  * Sliding group-window on event-time.
+  */
+class SlidingEventTimeWindow(
+    alias: Option[Expression],
+    timeField: Expression,
+    size: Expression,
+    slide: Expression)
+  extends EventTimeWindow(timeField) {
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    EventTimeSlidingGroupWindow(name.orElse(alias), timeField, size, slide)
+}
+
+// ------------------------------------------------------------------------------------------------
+// Session group windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+  * Session group-window.
+  *
+  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+  * grouped by processing-time.
+  *
+  * @param gap the time interval of inactivity before a window is closed.
+  */
+class SessionWindow(gap: Expression) extends GroupWindow {
+
+  /**
+    * Session group-window.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * @param gap the time interval of inactivity before a window is closed.
+    */
+  def this(gap: String) = this(ExpressionParser.parseExpression(gap))
+
+  private var alias: Option[Expression] = None
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a session group-window on event-time
+    */
+  def on(timeField: Expression): SessionEventTimeWindow =
+    new SessionEventTimeWindow(alias, timeField, gap)
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a session group-window on event-time
+    */
+  def on(timeField: String): SessionEventTimeWindow =
+    on(ExpressionParser.parseExpression(timeField))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: Expression): SessionWindow = {
+    this.alias = Some(alias)
+    this
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): SessionWindow = as(ExpressionParser.parseExpression(alias))
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    ProcessingTimeSessionGroupWindow(alias, gap)
+}
+
+/**
+  * Session group-window on event-time.
+  */
+class SessionEventTimeWindow(
+    alias: Option[Expression],
+    timeField: Expression,
+    gap: Expression)
+  extends EventTimeWindow(timeField) {
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    EventTimeSessionGroupWindow(name.orElse(alias), timeField, gap)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
new file mode 100644
index 0000000..f646caf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.tools.{RuleSet, RuleSets}
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConverters._
+
+/**
+  * Builder for creating a Calcite configuration.
+  */
+class CalciteConfigBuilder {
+  private var replaceRules: Boolean = false
+  private var ruleSets: List[RuleSet] = Nil
+
+  private var replaceOperatorTable: Boolean = false
+  private var operatorTables: List[SqlOperatorTable] = Nil
+
+  private var replaceSqlParserConfig: Option[SqlParser.Config] = None
+
+  /**
+    * Replaces the built-in rule set with the given rule set.
+    */
+  def replaceRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(replaceRuleSet)
+    ruleSets = List(replaceRuleSet)
+    replaceRules = true
+    this
+  }
+
+  /**
+    * Appends the given rule set to the built-in rule set.
+    */
+  def addRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(addedRuleSet)
+    ruleSets = addedRuleSet :: ruleSets
+    this
+  }
+
+  /**
+    * Replaces the built-in SQL operator table with the given table.
+    */
+  def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(replaceSqlOperatorTable)
+    operatorTables = List(replaceSqlOperatorTable)
+    replaceOperatorTable = true
+    this
+  }
+
+  /**
+    * Appends the given table to the built-in SQL operator table.
+    */
+  def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(addedSqlOperatorTable)
+    this.operatorTables = addedSqlOperatorTable :: this.operatorTables
+    this
+  }
+
+  /**
+    * Replaces the built-in SQL parser configuration with the given configuration.
+    */
+  def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(sqlParserConfig)
+    replaceSqlParserConfig = Some(sqlParserConfig)
+    this
+  }
+
+  private class CalciteConfigImpl(
+      val getRuleSet: Option[RuleSet],
+      val replacesRuleSet: Boolean,
+      val getSqlOperatorTable: Option[SqlOperatorTable],
+      val replacesSqlOperatorTable: Boolean,
+      val getSqlParserConfig: Option[SqlParser.Config])
+    extends CalciteConfig
+
+  /**
+    * Builds a new [[CalciteConfig]].
+    */
+  def build(): CalciteConfig = new CalciteConfigImpl(
+        ruleSets match {
+      case Nil => None
+      case h :: Nil => Some(h)
+      case _ =>
+        // concat rule sets
+        val concatRules = ruleSets.foldLeft(Nil: Iterable[RelOptRule])( (c, r) => r.asScala ++ c)
+        Some(RuleSets.ofList(concatRules.asJava))
+    },
+    this.replaceRules,
+    operatorTables match {
+      case Nil => None
+      case h :: Nil => Some(h)
+      case _ =>
+        // chain operator tables
+        Some(operatorTables.reduce( (x, y) => ChainedSqlOperatorTable.of(x, y)))
+    },
+    this.replaceOperatorTable,
+    replaceSqlParserConfig)
+}
+
+/**
+  * Calcite configuration for defining a custom Calcite configuration for Table and SQL API.
+  */
+trait CalciteConfig {
+  /**
+    * Returns whether this configuration replaces the built-in rule set.
+    */
+  def replacesRuleSet: Boolean
+
+  /**
+    * Returns a custom rule set.
+    */
+  def getRuleSet: Option[RuleSet]
+
+  /**
+    * Returns whether this configuration replaces the built-in SQL operator table.
+    */
+  def replacesSqlOperatorTable: Boolean
+
+  /**
+    * Returns a custom SQL operator table.
+    */
+  def getSqlOperatorTable: Option[SqlOperatorTable]
+
+  /**
+    * Returns a custom SQL parser configuration.
+    */
+  def getSqlParserConfig: Option[SqlParser.Config]
+}
+
+object CalciteConfig {
+
+  val DEFAULT = createBuilder().build()
+
+  /**
+    * Creates a new builder for constructing a [[CalciteConfig]].
+    */
+  def createBuilder(): CalciteConfigBuilder = {
+    new CalciteConfigBuilder
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
new file mode 100644
index 0000000..b4a3c42
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.validate.{SqlConformance, SqlValidatorImpl}
+import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
+
+/**
+ * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
+ */
+class FlinkCalciteSqlValidator(
+    opTab: SqlOperatorTable,
+    catalogReader: CalciteCatalogReader,
+    typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
+        opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {
+
+  override def getLogicalSourceRowType(
+      sourceRowType: RelDataType,
+      insert: SqlInsert): RelDataType = {
+    typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType)
+  }
+
+  override def getLogicalTargetRowType(
+      targetRowType: RelDataType,
+      insert: SqlInsert): RelDataType = {
+    typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
+  }
+}