You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/24 19:23:35 UTC

[1/5] flink git commit: [FLINK-5658] [table] Add event-time OVER ROWS/RANGE UNBOUNDED PRECEDING aggregation to SQL.

Repository: flink
Updated Branches:
  refs/heads/master 976e03c1e -> fe2c61a28


[FLINK-5658] [table] Add event-time OVER ROWS/RANGE UNBOUNDED PRECEDING aggregation to SQL.

This closes #3386.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe2c61a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe2c61a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe2c61a2

Branch: refs/heads/master
Commit: fe2c61a28e6a5300b2cf4c1e50ee884b51ef42c9
Parents: 7a9d39f
Author: hongyuhong 00223286 <ho...@huawei.com>
Authored: Fri Mar 24 09:31:59 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100

----------------------------------------------------------------------
 .../datastream/DataStreamOverAggregate.scala    |  62 +++--
 .../table/runtime/aggregate/AggregateUtil.scala |  70 +++--
 .../UnboundedEventTimeOverProcessFunction.scala | 224 ++++++++++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  | 263 ++++++++++++++++++-
 .../scala/stream/sql/WindowAggregateTest.scala  |  64 ++++-
 5 files changed, 634 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 547c875..3dd7ee2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -104,7 +104,7 @@ class DataStreamOverAggregate(
       case _: ProcTimeType =>
         // proc-time OVER window
         if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
-          // non-bounded OVER window
+          // unbounded preceding OVER window
           createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
         } else if (
           overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
@@ -126,23 +126,15 @@ class DataStreamOverAggregate(
         }
       case _: RowTimeType =>
         // row-time OVER window
-        if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
-          // non-bounded OVER window
-          if (overWindow.isRows) {
-            // ROWS clause unbounded OVER window
-            throw new TableException(
-              "ROWS clause unbounded row-time OVER window no supported yet.")
-          } else {
-            // RANGE clause unbounded OVER window
-            throw new TableException(
-              "RANGE clause unbounded row-time OVER window no supported yet.")
-          }
-        } else if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
-            overWindow.upperBound.isCurrentRow) {
+        if (overWindow.lowerBound.isPreceding &&
+              overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
+          // unbounded preceding OVER window
+          createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
+        } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
           if (overWindow.isRows) {
             // ROWS clause bounded OVER window
-            createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, true)
+            createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, isRowTimeType = true)
           } else {
             // RANGE clause bounded OVER window
             throw new TableException(
@@ -187,7 +179,7 @@ class DataStreamOverAggregate(
         val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
           namedAggregates,
           inputType,
-          false)
+          isPartitioned = false)
 
         inputDS
           .process(processFunction).setParallelism(1).setMaxParallelism(1)
@@ -205,7 +197,6 @@ class DataStreamOverAggregate(
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
     val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
-    val inputFields = (0 until inputType.getFieldCount).toArray
 
     val precedingOffset =
       getLowerBoundary(logicWindow, overWindow, getInput()) + 1
@@ -216,7 +207,6 @@ class DataStreamOverAggregate(
     val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction(
       namedAggregates,
       inputType,
-      inputFields,
       precedingOffset,
       isRowTimeType
     )
@@ -244,6 +234,42 @@ class DataStreamOverAggregate(
     result
   }
 
+  def createUnboundedAndCurrentRowEventTimeOverWindow(
+    inputDS: DataStream[Row]): DataStream[Row]  = {
+
+    val overWindow: Group = logicWindow.groups.get(0)
+    val partitionKeys: Array[Int] = overWindow.keys.toArray
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+    // get the output types
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+    val processFunction = AggregateUtil.createUnboundedEventTimeOverProcessFunction(
+      namedAggregates,
+      inputType)
+
+    val result: DataStream[Row] =
+      // partitioned aggregation
+      if (partitionKeys.nonEmpty) {
+        inputDS.keyBy(partitionKeys: _*)
+          .process(processFunction)
+          .returns(rowTypeInfo)
+          .name(aggOpName)
+          .asInstanceOf[DataStream[Row]]
+      }
+      // global non-partitioned aggregation
+      else {
+        inputDS.keyBy(new NullByteKeySelector[Row])
+          .process(processFunction)
+          .setParallelism(1)
+          .setMaxParallelism(1)
+          .returns(rowTypeInfo)
+          .name(aggOpName)
+          .asInstanceOf[DataStream[Row]]
+      }
+    result
+  }
+
   private def generateNamedAggregates: Seq[CalcitePair[AggregateCall, String]] = {
     val overWindow: Group = logicWindow.groups.get(0)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 0084ee5..fdac692 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -104,7 +104,6 @@ object AggregateUtil {
   private[flink] def createRowsClauseBoundedOverProcessFunction(
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
     inputType: RelDataType,
-    inputFields: Array[Int],
     precedingOffset: Long,
     isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
 
@@ -114,26 +113,49 @@ object AggregateUtil {
         inputType,
         needRetraction = true)
 
-    val aggregationStateType: RowTypeInfo =
-      createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+    val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+    val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
 
-    val inputRowType: RowTypeInfo =
-      createDataSetAggregateBufferDataType(inputFields, Array(), inputType)
+    if (isRowTimeType) {
+      new RowsClauseBoundedOverProcessFunction(
+        aggregates,
+        aggFields,
+        inputType.getFieldCount,
+        aggregationStateType,
+        inputRowType,
+        precedingOffset
+      )
+    } else {
+      throw TableException(
+        "Bounded partitioned proc-time OVER aggregation is not supported yet.")
+    }
+  }
 
-      val processFunction = if (isRowTimeType) {
-        new RowsClauseBoundedOverProcessFunction(
-          aggregates,
-          aggFields,
-          inputType.getFieldCount,
-          aggregationStateType,
-          inputRowType,
-          precedingOffset
-        )
-      } else {
-        throw TableException(
-          "Bounded partitioned proc-time OVER aggregation is not supported yet.")
-      }
-      processFunction
+  /**
+    * Create an [[ProcessFunction]] to evaluate final aggregate value.
+    *
+    * @param namedAggregates List of calls to aggregate functions and their output field names
+    * @param inputType Input row type
+    * @return [[UnboundedEventTimeOverProcessFunction]]
+    */
+  private[flink] def createUnboundedEventTimeOverProcessFunction(
+   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+   inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
+
+    val (aggFields, aggregates) =
+      transformToAggregateFunctions(
+        namedAggregates.map(_.getKey),
+        inputType,
+        needRetraction = false)
+
+    val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+
+    new UnboundedEventTimeOverProcessFunction(
+      aggregates,
+      aggFields,
+      inputType.getFieldCount,
+      aggregationStateType,
+      FlinkTypeFactory.toInternalRowTypeInfo(inputType))
   }
 
   /**
@@ -595,7 +617,7 @@ object AggregateUtil {
       // compute preaggregation type
       val preAggFieldTypes = gkeyInFields
         .map(inputType.getFieldList.get(_).getType)
-        .map(FlinkTypeFactory.toTypeInfo) ++ createAccumulatorType(inputType, aggregates)
+        .map(FlinkTypeFactory.toTypeInfo) ++ createAccumulatorType(aggregates)
       val preAggRowType = new RowTypeInfo(preAggFieldTypes: _*)
 
       (
@@ -701,7 +723,7 @@ object AggregateUtil {
 
     val aggResultTypes = namedAggregates.map(a => FlinkTypeFactory.toTypeInfo(a.left.getType))
 
-    val accumulatorRowType = createAccumulatorRowType(inputType, aggregates)
+    val accumulatorRowType = createAccumulatorRowType(aggregates)
     val aggResultRowType = new RowTypeInfo(aggResultTypes: _*)
     val aggFunction = new AggregateAggFunction(aggregates, aggFields)
 
@@ -1029,7 +1051,6 @@ object AggregateUtil {
   }
 
   private def createAccumulatorType(
-      inputType: RelDataType,
       aggregates: Array[TableAggregateFunction[_]]): Seq[TypeInformation[_]] = {
 
     val aggTypes: Seq[TypeInformation[_]] =
@@ -1068,7 +1089,7 @@ object AggregateUtil {
         .map(FlinkTypeFactory.toTypeInfo)
 
     // get all field data types of all intermediate aggregates
-    val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(inputType, aggregates)
+    val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(aggregates)
 
     // concat group key types, aggregation types, and window key types
     val allFieldTypes: Seq[TypeInformation[_]] = windowKeyTypes match {
@@ -1079,10 +1100,9 @@ object AggregateUtil {
   }
 
   private def createAccumulatorRowType(
-      inputType: RelDataType,
       aggregates: Array[TableAggregateFunction[_]]): RowTypeInfo = {
 
-    val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(inputType, aggregates)
+    val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(aggregates)
 
     new RowTypeInfo(aggTypes: _*)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
new file mode 100644
index 0000000..7616ede
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+    private val aggregates: Array[AggregateFunction[_]],
+    private val aggFields: Array[Int],
+    private val forwardedFieldCount: Int,
+    private val intermediateType: TypeInformation[Row],
+    private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  // state to hold the accumulators of the aggregations
+  private var accumulatorState: ValueState[Row] = _
+  // state to hold rows until the next watermark arrives
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  // list to sort timestamps to access rows in timestamp order
+  private var sortedTimestamps: util.LinkedList[Long] = _
+
+
+  override def open(config: Configuration) {
+    output = new Row(forwardedFieldCount + aggregates.length)
+    sortedTimestamps = new util.LinkedList[Long]()
+
+    // initialize accumulator state
+    val accDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("accumulatorstate", intermediateType)
+    accumulatorState = getRuntimeContext.getState[Row](accDescriptor)
+
+    // initialize row state
+    val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType)
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+  }
+
+  /**
+    * Puts an element from the input stream into state if it is not late.
+    * Registers a timer for the next watermark.
+    *
+    * @param input The input value.
+    * @param ctx   The ctx to register timer or get current time
+    * @param out   The collector for returning result values.
+    *
+    */
+  override def processElement(
+     input: Row,
+     ctx:  ProcessFunction[Row, Row]#Context,
+     out: Collector[Row]): Unit = {
+
+    val timestamp = ctx.timestamp()
+    val curWatermark = ctx.timerService().currentWatermark()
+
+    // discard late record
+    if (timestamp >= curWatermark) {
+      // ensure every key just registers one timer
+      ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+
+      // put row into state
+      var rowList = rowMapState.get(timestamp)
+      if (rowList == null) {
+        rowList = new util.ArrayList[Row]()
+      }
+      rowList.add(input)
+      rowMapState.put(timestamp, rowList)
+    }
+  }
+
+  /**
+    * Called when a watermark arrived.
+    * Sorts records according the timestamp, computes aggregates, and emits all records with
+    * timestamp smaller than the watermark in timestamp order.
+    *
+    * @param timestamp The timestamp of the firing timer.
+    * @param ctx       The ctx to register timer or get current time
+    * @param out       The collector for returning result values.
+    */
+  override def onTimer(
+      timestamp: Long,
+      ctx: ProcessFunction[Row, Row]#OnTimerContext,
+      out: Collector[Row]): Unit = {
+
+    Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]])
+    val collector = out.asInstanceOf[TimestampedCollector[Row]]
+
+    val keyIterator = rowMapState.keys.iterator
+    if (keyIterator.hasNext) {
+      val curWatermark = ctx.timerService.currentWatermark
+      var existEarlyRecord: Boolean = false
+      var i = 0
+
+      // sort the record timestamps
+      do {
+        val recordTime = keyIterator.next
+        // only take timestamps smaller/equal to the watermark
+        if (recordTime <= curWatermark) {
+          insertToSortedList(recordTime)
+        } else {
+          existEarlyRecord = true
+        }
+      } while (keyIterator.hasNext)
+
+      // get last accumulator
+      var lastAccumulator = accumulatorState.value
+      if (lastAccumulator == null) {
+        // initialize accumulator
+        lastAccumulator = new Row(aggregates.length)
+        while (i < aggregates.length) {
+          lastAccumulator.setField(i, aggregates(i).createAccumulator())
+          i += 1
+        }
+      }
+
+      // emit the rows in order
+      while (!sortedTimestamps.isEmpty) {
+        val curTimestamp = sortedTimestamps.removeFirst()
+        val curRowList = rowMapState.get(curTimestamp)
+        collector.setAbsoluteTimestamp(curTimestamp)
+
+        var j = 0
+        while (j < curRowList.size) {
+          val curRow = curRowList.get(j)
+          i = 0
+
+          // copy forwarded fields to output row
+          while (i < forwardedFieldCount) {
+            output.setField(i, curRow.getField(i))
+            i += 1
+          }
+
+          // update accumulators and copy aggregates to output row
+          i = 0
+          while (i < aggregates.length) {
+            val index = forwardedFieldCount + i
+            val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+            aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+            output.setField(index, aggregates(i).getValue(accumulator))
+            i += 1
+          }
+          // emit output row
+          collector.collect(output)
+          j += 1
+        }
+        rowMapState.remove(curTimestamp)
+      }
+
+      accumulatorState.update(lastAccumulator)
+
+      // if are are rows with timestamp > watermark, register a timer for the next watermark
+      if (existEarlyRecord) {
+        ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+      }
+    }
+  }
+
+  /**
+   * Inserts timestamps in order into a linked list.
+   *
+   * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
+   * an append with O(1).
+   */
+  private def insertToSortedList(recordTimeStamp: Long) = {
+    val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
+    var continue = true
+    while (listIterator.hasPrevious && continue) {
+      val timestamp = listIterator.previous
+      if (recordTimeStamp >= timestamp) {
+        listIterator.next
+        listIterator.add(recordTimeStamp)
+        continue = false
+      }
+    }
+
+    if (continue) {
+      sortedTimestamps.addFirst(recordTimeStamp)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 19350a7..34a68b2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.{TableEnvironment, TableException}
@@ -436,6 +436,266 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.execute()
   }
 
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      // the next 3 elements are late
+      Left(14000008L, (1, 4L, "Hello world")),
+      Left(14000008L, (2, 3L, "Hello world")),
+      Left(14000008L, (3, 3L, "Hello world")),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 6L, "Hello world")),
+      // the next 3 elements are late
+      Left(14000019L, (1, 6L, "Hello world")),
+      Left(14000018L, (2, 4L, "Hello world")),
+      Left(14000018L, (3, 4L, "Hello world")),
+      Left(14000022L, (2, 5L, "Hello world")),
+      Left(14000022L, (3, 5L, "Hello world")),
+      Left(14000024L, (1, 7L, "Hello world")),
+      Left(14000023L, (1, 8L, "Hello world")),
+      Left(14000021L, (1, 9L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,2,Hello,2,1,2,2,2",
+      "1,3,Hello world,5,2,2,3,2",
+      "1,1,Hi,6,3,2,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,3,2,1,2,1",
+      "3,1,Hello,1,1,1,1,1",
+      "3,2,Hello world,3,2,1,2,1",
+      "1,5,Hello world,11,4,2,5,1",
+      "1,6,Hello world,17,5,3,6,1",
+      "1,9,Hello world,26,6,4,9,1",
+      "1,8,Hello world,34,7,4,9,1",
+      "1,7,Hello world,41,8,5,9,1",
+      "2,5,Hello world,8,3,2,5,1",
+      "3,5,Hello world,8,3,2,5,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithPartitionMultiThread(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime() range between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 6L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Left(14000024L, (3, 5L, "Hello world")),
+      Left(14000026L, (1, 7L, "Hello world")),
+      Left(14000025L, (1, 8L, "Hello world")),
+      Left(14000022L, (1, 9L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,2,Hello,2,1,2,2,2",
+      "1,3,Hello world,5,2,2,3,2",
+      "1,1,Hi,6,3,2,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,3,2,1,2,1",
+      "3,1,Hello,1,1,1,1,1",
+      "3,2,Hello world,3,2,1,2,1",
+      "1,5,Hello world,11,4,2,5,1",
+      "1,6,Hello world,17,5,3,6,1",
+      "1,9,Hello world,26,6,4,9,1",
+      "1,8,Hello world,34,7,4,9,1",
+      "1,7,Hello world,41,8,5,9,1",
+      "2,5,Hello world,8,3,2,5,1",
+      "3,5,Hello world,8,3,2,5,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test sliding event-time unbounded window without partitiion by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 2L, "Hello")),
+      Left(14000002L, (3, 5L, "Hello")),
+      Left(14000003L, (1, 3L, "Hello")),
+      Left(14000004L, (3, 7L, "Hello world")),
+      Left(14000007L, (4, 9L, "Hello world")),
+      Left(14000008L, (5, 8L, "Hello world")),
+      Right(14000010L),
+      // this element will be discard because it is late
+      Left(14000008L, (6, 8L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (6, 8L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello,2,1,2,2,2",
+      "3,5,Hello,7,2,3,5,2",
+      "1,3,Hello,10,3,3,5,2",
+      "3,7,Hello world,17,4,4,7,2",
+      "1,1,Hi,18,5,3,7,1",
+      "4,9,Hello world,27,6,4,9,1",
+      "5,8,Hello world,35,7,5,9,1",
+      "6,8,Hello world,43,8,5,9,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test sliding event-time unbounded window without partitiion by and arrive early **/
+  @Test
+  def testUnboundedEventTimeRowWindowArriveEarly(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
+      "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+      "from T1"
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 2L, "Hello")),
+      Left(14000002L, (3, 5L, "Hello")),
+      Left(14000003L, (1, 3L, "Hello")),
+      // next three elements are early
+      Left(14000012L, (3, 7L, "Hello world")),
+      Left(14000013L, (4, 9L, "Hello world")),
+      Left(14000014L, (5, 8L, "Hello world")),
+      Right(14000010L),
+      Left(14000011L, (6, 8L, "Hello world")),
+      // next element is early
+      Left(14000021L, (6, 8L, "Hello world")),
+      Right(14000020L),
+      Right(14000030L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello,2,1,2,2,2",
+      "3,5,Hello,7,2,3,5,2",
+      "1,3,Hello,10,3,3,5,2",
+      "1,1,Hi,11,4,2,5,1",
+      "6,8,Hello world,19,5,3,8,1",
+      "3,7,Hello world,26,6,4,8,1",
+      "4,9,Hello world,35,7,5,9,1",
+      "5,8,Hello world,43,8,5,9,1",
+      "6,8,Hello world,51,9,5,9,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object SqlITCase {
@@ -451,5 +711,4 @@ object SqlITCase {
 
     override def cancel(): Unit = ???
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 9a425b3..7b8b2df 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -241,12 +241,67 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
+  def testUnboundNonPartitionedEventTimeWindowWithRange() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("orderBy", "ROWTIME"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testUnboundPartitionedEventTimeWindowWithRange() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "ROWTIME"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
   def testBoundPartitionedRowTimeWindowWithRow() = {
     val sql = "SELECT " +
-        "c, " +
-        "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
-        "CURRENT ROW) as cnt1 " +
-        "from MyTable"
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
 
     val expected =
       unaryNode(
@@ -294,4 +349,5 @@ class WindowAggregateTest extends TableTestBase {
       )
     streamUtil.verifySql(sql, expected)
   }
+
 }


[4/5] flink git commit: [FLINK-5990] [table] Add event-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.

Posted by fh...@apache.org.
[FLINK-5990] [table] Add event-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.

This closes #3585.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a9d39fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a9d39fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a9d39fe

Branch: refs/heads/master
Commit: 7a9d39fe9f659d43bf4719a2981f6c4771ffbe48
Parents: 6949c8c
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Sun Mar 19 23:31:00 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100

----------------------------------------------------------------------
 .../flink/table/plan/nodes/OverAggregate.scala  |  31 ++-
 .../datastream/DataStreamOverAggregate.scala    | 149 +++++++++---
 .../table/runtime/aggregate/AggregateUtil.scala |  48 +++-
 .../RowsClauseBoundedOverProcessFunction.scala  | 239 +++++++++++++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  | 139 ++++++++++-
 .../scala/stream/sql/WindowAggregateTest.scala  |  55 +++++
 6 files changed, 623 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
index 793ab23..91c8cef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
@@ -18,12 +18,15 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.{RelFieldCollation, RelNode}
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.Window
+import org.apache.calcite.rex.{RexInputRef}
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
+
 import scala.collection.JavaConverters._
 
 trait OverAggregate {
@@ -46,8 +49,16 @@ trait OverAggregate {
     orderingString
   }
 
-  private[flink] def windowRange(overWindow: Group): String = {
-    s"BETWEEN ${overWindow.lowerBound} AND ${overWindow.upperBound}"
+  private[flink] def windowRange(
+    logicWindow: Window,
+    overWindow: Group,
+    input: RelNode): String = {
+    if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded) {
+      s"BETWEEN ${getLowerBoundary(logicWindow, overWindow, input)} PRECEDING " +
+          s"AND ${overWindow.upperBound}"
+    } else {
+      s"BETWEEN ${overWindow.lowerBound} AND ${overWindow.upperBound}"
+    }
   }
 
   private[flink] def aggregationToString(
@@ -92,4 +103,18 @@ trait OverAggregate {
     }.mkString(", ")
   }
 
+  private[flink] def getLowerBoundary(
+    logicWindow: Window,
+    overWindow: Group,
+    input: RelNode): Long = {
+
+    val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
+    val lowerBoundIndex = input.getRowType.getFieldCount - ref.getIndex;
+    val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
+    lowerBound match {
+      case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
+      case _ => lowerBound.asInstanceOf[Long]
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 34b3b0f..547c875 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -32,6 +32,7 @@ import org.apache.calcite.rel.core.Window
 import org.apache.calcite.rel.core.Window.Group
 import java.util.{List => JList}
 
+import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 
@@ -70,9 +71,9 @@ class DataStreamOverAggregate(
 
     super.explainTerms(pw)
       .itemIf("partitionBy", partitionToString(inputType, partitionKeys), partitionKeys.nonEmpty)
-        .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
-      .itemIf("rows", windowRange(overWindow), overWindow.isRows)
-      .itemIf("range", windowRange(overWindow), !overWindow.isRows)
+      .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
+      .itemIf("rows", windowRange(logicWindow, overWindow, getInput), overWindow.isRows)
+      .itemIf("range", windowRange(logicWindow, overWindow, getInput), !overWindow.isRows)
       .item(
         "select", aggregationToString(
           inputType,
@@ -99,20 +100,58 @@ class DataStreamOverAggregate(
       .getFieldList
       .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
       .getValue
-
     timeType match {
       case _: ProcTimeType =>
-        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
-        if (overWindow.lowerBound.isUnbounded &&
-          overWindow.upperBound.isCurrentRow) {
+        // proc-time OVER window
+        if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
+          // non-bounded OVER window
           createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+        } else if (
+          overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
+              overWindow.upperBound.isCurrentRow) {
+          // bounded OVER window
+          if (overWindow.isRows) {
+            // ROWS clause bounded OVER window
+            throw new TableException(
+              "ROWS clause bounded proc-time OVER window no supported yet.")
+          } else {
+            // RANGE clause bounded OVER window
+            throw new TableException(
+              "RANGE clause bounded proc-time OVER window no supported yet.")
+          }
         } else {
           throw new TableException(
-              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
-              "condition.")
+            "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
+                "condition.")
         }
       case _: RowTimeType =>
-        throw new TableException("OVER Window of the EventTime type is not currently supported.")
+        // row-time OVER window
+        if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
+          // non-bounded OVER window
+          if (overWindow.isRows) {
+            // ROWS clause unbounded OVER window
+            throw new TableException(
+              "ROWS clause unbounded row-time OVER window no supported yet.")
+          } else {
+            // RANGE clause unbounded OVER window
+            throw new TableException(
+              "RANGE clause unbounded row-time OVER window no supported yet.")
+          }
+        } else if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
+            overWindow.upperBound.isCurrentRow) {
+          // bounded OVER window
+          if (overWindow.isRows) {
+            // ROWS clause bounded OVER window
+            createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, true)
+          } else {
+            // RANGE clause bounded OVER window
+            throw new TableException(
+              "RANGE clause bounded row-time OVER window no supported yet.")
+          }
+        } else {
+          throw new TableException(
+            "row-time OVER window only support CURRENT ROW condition.")
+        }
       case _ =>
         throw new TableException(s"Unsupported time type {$timeType}")
     }
@@ -120,7 +159,7 @@ class DataStreamOverAggregate(
   }
 
   def createUnboundedAndCurrentRowProcessingTimeOverWindow(
-    inputDS: DataStream[Row]): DataStream[Row]  = {
+    inputDS: DataStream[Row]): DataStream[Row] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -130,32 +169,78 @@ class DataStreamOverAggregate(
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
     val result: DataStream[Row] =
-        // partitioned aggregation
-        if (partitionKeys.nonEmpty) {
-          val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-            namedAggregates,
-            inputType)
+    // partitioned aggregation
+      if (partitionKeys.nonEmpty) {
+        val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
+          namedAggregates,
+          inputType)
 
-          inputDS
+        inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
           .returns(rowTypeInfo)
           .name(aggOpName)
           .asInstanceOf[DataStream[Row]]
-        }
-        // non-partitioned aggregation
-        else {
-          val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-            namedAggregates,
-            inputType,
-            false)
-
-          inputDS
-            .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(rowTypeInfo)
-            .name(aggOpName)
-            .asInstanceOf[DataStream[Row]]
-        }
+      }
+      // non-partitioned aggregation
+      else {
+        val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
+          namedAggregates,
+          inputType,
+          false)
+
+        inputDS
+          .process(processFunction).setParallelism(1).setMaxParallelism(1)
+          .returns(rowTypeInfo)
+          .name(aggOpName)
+          .asInstanceOf[DataStream[Row]]
+      }
+    result
+  }
+
+  def createRowsClauseBoundedAndCurrentRowOverWindow(
+    inputDS: DataStream[Row],
+    isRowTimeType: Boolean = false): DataStream[Row] = {
+
+    val overWindow: Group = logicWindow.groups.get(0)
+    val partitionKeys: Array[Int] = overWindow.keys.toArray
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+    val inputFields = (0 until inputType.getFieldCount).toArray
+
+    val precedingOffset =
+      getLowerBoundary(logicWindow, overWindow, getInput()) + 1
+
+    // get the output types
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+    val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction(
+      namedAggregates,
+      inputType,
+      inputFields,
+      precedingOffset,
+      isRowTimeType
+    )
+    val result: DataStream[Row] =
+    // partitioned aggregation
+      if (partitionKeys.nonEmpty) {
+        inputDS
+          .keyBy(partitionKeys: _*)
+          .process(processFunction)
+          .returns(rowTypeInfo)
+          .name(aggOpName)
+          .asInstanceOf[DataStream[Row]]
+      }
+      // non-partitioned aggregation
+      else {
+        inputDS
+          .keyBy(new NullByteKeySelector[Row])
+          .process(processFunction)
+          .setParallelism(1)
+          .setMaxParallelism(1)
+          .returns(rowTypeInfo)
+          .name(aggOpName)
+          .asInstanceOf[DataStream[Row]]
+      }
     result
   }
 
@@ -180,7 +265,7 @@ class DataStreamOverAggregate(
       }
     }ORDER BY: ${orderingToString(inputType, overWindow.orderKeys.getFieldCollations)}, " +
       s"${if (overWindow.isRows) "ROWS" else "RANGE"}" +
-      s"${windowRange(overWindow)}, " +
+      s"${windowRange(logicWindow, overWindow, getInput)}, " +
       s"select: (${
         aggregationToString(
           inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 9feec17..0084ee5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -61,7 +61,7 @@ object AggregateUtil {
     * @param isPartitioned Flag to indicate whether the input is partitioned or not
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
-  private[flink] def CreateUnboundedProcessingOverProcessFunction(
+  private[flink] def createUnboundedProcessingOverProcessFunction(
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
     inputType: RelDataType,
     isPartitioned: Boolean = true): ProcessFunction[Row, Row] = {
@@ -91,6 +91,52 @@ object AggregateUtil {
   }
 
   /**
+    * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
+    * bounded OVER window to evaluate final aggregate value.
+    *
+    * @param namedAggregates List of calls to aggregate functions and their output field names
+    * @param inputType       Input row type
+    * @param inputFields     All input fields
+    * @param precedingOffset the preceding offset
+    * @param isRowTimeType   It is a tag that indicates whether the time type is rowTimeType
+    * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
+    */
+  private[flink] def createRowsClauseBoundedOverProcessFunction(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    inputFields: Array[Int],
+    precedingOffset: Long,
+    isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
+
+    val (aggFields, aggregates) =
+      transformToAggregateFunctions(
+        namedAggregates.map(_.getKey),
+        inputType,
+        needRetraction = true)
+
+    val aggregationStateType: RowTypeInfo =
+      createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+
+    val inputRowType: RowTypeInfo =
+      createDataSetAggregateBufferDataType(inputFields, Array(), inputType)
+
+      val processFunction = if (isRowTimeType) {
+        new RowsClauseBoundedOverProcessFunction(
+          aggregates,
+          aggFields,
+          inputType.getFieldCount,
+          aggregationStateType,
+          inputRowType,
+          precedingOffset
+        )
+      } else {
+        throw TableException(
+          "Bounded partitioned proc-time OVER aggregation is not supported yet.")
+      }
+      processFunction
+  }
+
+  /**
     * Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.
     * The output of the function contains the grouping keys and the timestamp and the intermediate
     * aggregate values of all aggregate function. The timestamp field is aligned to time window

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
new file mode 100644
index 0000000..1678d57
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for ROWS clause event-time bounded OVER window
+ *
+ * @param aggregates           the list of all [[AggregateFunction]] used for this aggregation
+ * @param aggFields            the position (in the input Row) of the input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType         the row type info of input row
+ * @param precedingOffset      the preceding offset
+ */
+class RowsClauseBoundedOverProcessFunction(
+    private val aggregates: Array[AggregateFunction[_]],
+    private val aggFields: Array[Int],
+    private val forwardedFieldCount: Int,
+    private val aggregationStateType: RowTypeInfo,
+    private val inputRowType: RowTypeInfo,
+    private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which keeps the count of data
+  private var dataCountState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+    output = new Row(forwardedFieldCount + aggregates.length)
+
+    val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
+    lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+    val dataCountStateDescriptor =
+      new ValueStateDescriptor[Long]("dataCountState", classOf[Long])
+    dataCountState = getRuntimeContext.getState(dataCountStateDescriptor)
+
+    val accumulatorStateDescriptor =
+      new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
+    accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
+
+    val keyTypeInformation: TypeInformation[Long] =
+      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+    val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]](
+        "dataState",
+        keyTypeInformation,
+        valueTypeInformation)
+
+    dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    // triggering timestamp for trigger calculation
+    val triggeringTs = ctx.timestamp
+
+    val lastTriggeringTs = lastTriggeringTsState.value
+    // check if the data is expired, if not, save the data and register event time timer
+
+    if (triggeringTs > lastTriggeringTs) {
+      val data = dataState.get(triggeringTs)
+      if (null != data) {
+        data.add(input)
+        dataState.put(triggeringTs, data)
+      } else {
+        val data = new util.ArrayList[Row]
+        data.add(input)
+        dataState.put(triggeringTs, data)
+        // register event time timer
+        ctx.timerService.registerEventTimeTimer(triggeringTs)
+      }
+    }
+  }
+
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[Row, Row]#OnTimerContext,
+    out: Collector[Row]): Unit = {
+
+    // gets all window data from state for the calculation
+    val inputs: JList[Row] = dataState.get(timestamp)
+
+    if (null != inputs) {
+
+      var accumulators = accumulatorState.value
+      var dataCount = dataCountState.value
+
+      var retractList: JList[Row] = null
+      var retractTs: Long = Long.MaxValue
+      var retractCnt: Int = 0
+      var j = 0
+      var i = 0
+
+      while (j < inputs.size) {
+        val input = inputs.get(j)
+
+        // initialize when first run or failover recovery per key
+        if (null == accumulators) {
+          accumulators = new Row(aggregates.length)
+          i = 0
+          while (i < aggregates.length) {
+            accumulators.setField(i, aggregates(i).createAccumulator())
+            i += 1
+          }
+        }
+
+        var retractRow: Row = null
+
+        if (dataCount >= precedingOffset) {
+          if (null == retractList) {
+            // find the smallest timestamp
+            retractTs = Long.MaxValue
+            val dataTimestampIt = dataState.keys.iterator
+            while (dataTimestampIt.hasNext) {
+              val dataTs = dataTimestampIt.next
+              if (dataTs < retractTs) {
+                retractTs = dataTs
+              }
+            }
+            // get the oldest rows to retract them
+            retractList = dataState.get(retractTs)
+          }
+
+          retractRow = retractList.get(retractCnt)
+          retractCnt += 1
+
+          // remove retracted values from state
+          if (retractList.size == retractCnt) {
+            dataState.remove(retractTs)
+            retractList = null
+            retractCnt = 0
+          }
+        } else {
+          dataCount += 1
+        }
+
+        // copy forwarded fields to output row
+        i = 0
+        while (i < forwardedFieldCount) {
+          output.setField(i, input.getField(i))
+          i += 1
+        }
+
+        // retract old row from accumulators
+        if (null != retractRow) {
+          i = 0
+          while (i < aggregates.length) {
+            val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+            aggregates(i).retract(accumulator, retractRow.getField(aggFields(i)))
+            i += 1
+          }
+        }
+
+        // accumulate current row and set aggregate in output row
+        i = 0
+        while (i < aggregates.length) {
+          val index = forwardedFieldCount + i
+          val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+          aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+          output.setField(index, aggregates(i).getValue(accumulator))
+          i += 1
+        }
+        j += 1
+
+        out.collect(output)
+      }
+
+      // update all states
+      if (dataState.contains(retractTs)) {
+        if (retractCnt > 0) {
+          retractList.subList(0, retractCnt).clear()
+          dataState.put(retractTs, retractList)
+        }
+      }
+      dataCountState.update(dataCount)
+      accumulatorState.update(accumulators)
+    }
+
+    lastTriggeringTsState.update(timestamp)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index d5a140a..19350a7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,14 +19,18 @@
 package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamingWithStateTestBase, StreamITCase,
-StreamTestData}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 
 import scala.collection.mutable
 
@@ -293,6 +297,120 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
+    val data = Seq(
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((3L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Right(2L),
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))),
+      Right(6L),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(20L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, a, " +
+      "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+      ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+      " from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5","Hello,2,3,6",
+      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15",
+      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
+      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testBoundNonPartitionedEventTimeWindowWithRow(): Unit = {
+
+    val data = Seq(
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))), // early row
+      Right(3L),
+      Left((2L, (2L, 2, "Hello"))), // late row
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(7L),
+      Left((9L, (9L, 9, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Right(20L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env
+      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, a, " +
+      "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
+      "sum(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7",
+      "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15", "Hello World,7,3,18",
+      "Hello World,8,3,21", "Hello World,8,3,23",
+      "Hello World,9,3,25",
+      "Hello World,20,3,37")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
   /**
     *  All aggregates must be computed on the same window.
     */
@@ -317,4 +435,21 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
   }
+
+}
+
+object SqlITCase {
+
+  class EventTimeSourceFunction[T](
+      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+    override def run(ctx: SourceContext[T]): Unit = {
+      dataWithTimestampList.foreach {
+        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+        case Right(w) => ctx.emitWatermark(new Watermark(w))
+      }
+    }
+
+    override def cancel(): Unit = ???
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index a25e59c..9a425b3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -239,4 +239,59 @@ class WindowAggregateTest extends TableTestBase {
       )
     streamUtil.verifySql(sql, expected)
   }
+
+  @Test
+  def testBoundPartitionedRowTimeWindowWithRow() = {
+    val sql = "SELECT " +
+        "c, " +
+        "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+        "CURRENT ROW) as cnt1 " +
+        "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "ROWTIME"),
+          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testBoundNonPartitionedRowTimeWindowWithRow() = {
+    val sql = "SELECT " +
+        "c, " +
+        "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+        "CURRENT ROW) as cnt1 " +
+        "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("orderBy", "ROWTIME"),
+          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
 }


[3/5] flink git commit: [FLINK-5570] [table] Register ExternalCatalogs in TableEnvironment.

Posted by fh...@apache.org.
[FLINK-5570] [table] Register ExternalCatalogs in TableEnvironment.

This closes #3409.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/135a57c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/135a57c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/135a57c4

Branch: refs/heads/master
Commit: 135a57c4bb37eaa9cb85faaff1cc694f9448fabd
Parents: 976e03c
Author: jingzhang <be...@126.com>
Authored: Thu Mar 16 11:24:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  37 +++++
 .../flink/table/api/TableEnvironment.scala      | 104 ++++++++++--
 .../org/apache/flink/table/api/exceptions.scala |  62 +++++--
 .../table/catalog/ExternalCatalogSchema.scala   |  14 +-
 .../flink/table/plan/logical/operators.scala    |   4 +-
 .../flink/table/ExternalCatalogTest.scala       | 161 +++++++++++++++++++
 .../catalog/ExternalCatalogSchemaTest.scala     |   5 +-
 7 files changed, 342 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 03b916c..117f32f 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -344,6 +344,43 @@ tableEnvironment.unregisterTable("Customers")
 </div>
 </div>
 
+Registering external Catalogs
+--------------------------------
+
+An external catalog is defined by the `ExternalCatalog` interface and provides information about databases and tables such as their name, schema, statistics, and access information. An `ExternalCatalog` is registered in a `TableEnvironment` as follows: 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// works for StreamExecutionEnvironment identically
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ExternalCatalog customerCatalog = new InMemoryExternalCatalog();
+
+// register the ExternalCatalog customerCatalog
+tableEnv.registerExternalCatalog("Customers", customerCatalog);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// works for StreamExecutionEnvironment identically
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+val customerCatalog: ExternalCatalog = new InMemoryExternalCatalog
+
+// register the ExternalCatalog customerCatalog
+tableEnv.registerExternalCatalog("Customers", customerCatalog)
+
+{% endhighlight %}
+</div>
+</div>
+
+Once registered in a `TableEnvironment`, all tables defined in a `ExternalCatalog` can be accessed from Table API or SQL queries by specifying their full path (`catalog`.`database`.`table`).
+
+Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing purposes. However, the `ExternalCatalog` interface can also be used to connect catalogs like HCatalog or Metastore to the Table API.
 
 Table API
 ----------

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 1dda3a8..bb4c3ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala
 import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
 import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
@@ -60,6 +61,8 @@ import org.apache.flink.table.validate.FunctionCatalog
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable.HashMap
+import _root_.scala.annotation.varargs
 
 /**
   * The abstract base class for batch and stream TableEnvironments.
@@ -71,7 +74,7 @@ abstract class TableEnvironment(val config: TableConfig) {
   // the catalog to hold all registered and translated tables
   // we disable caching here to prevent side effects
   private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(true, false)
-  private val tables: SchemaPlus = internalSchema.plus()
+  private val rootSchema: SchemaPlus = internalSchema.plus()
 
   // Table API/SQL function catalog
   private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
@@ -79,7 +82,7 @@ abstract class TableEnvironment(val config: TableConfig) {
   // the configuration to create a Calcite planner
   private lazy val frameworkConfig: FrameworkConfig = Frameworks
     .newConfigBuilder
-    .defaultSchema(tables)
+    .defaultSchema(rootSchema)
     .parserConfig(getSqlParserConfig)
     .costFactory(new DataSetCostFactory)
     .typeSystem(new FlinkTypeSystem)
@@ -99,6 +102,9 @@ abstract class TableEnvironment(val config: TableConfig) {
   // a counter for unique attribute names
   private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
 
+  // registered external catalog names -> catalog
+  private val externalCatalogs = new HashMap[String, ExternalCatalog]
+
   /** Returns the table config to define the runtime behavior of the Table API. */
   def getConfig = config
 
@@ -246,6 +252,35 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
+    * Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema.
+    * All tables registered in the [[ExternalCatalog]] can be accessed.
+    *
+    * @param name            The name under which the externalCatalog will be registered
+    * @param externalCatalog The externalCatalog to register
+    */
+  def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
+    if (rootSchema.getSubSchema(name) != null) {
+      throw new ExternalCatalogAlreadyExistException(name)
+    }
+    this.externalCatalogs.put(name, externalCatalog)
+    // create an external catalog calicte schema, register it on the root schema
+    ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog)
+  }
+
+  /**
+    * Gets a registered [[ExternalCatalog]] by name.
+    *
+    * @param name The name to look up the [[ExternalCatalog]]
+    * @return The [[ExternalCatalog]]
+    */
+  def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
+    this.externalCatalogs.get(name) match {
+      case Some(catalog) => catalog
+      case None => throw new ExternalCatalogNotExistException(name)
+    }
+  }
+
+  /**
     * Registers a [[ScalarFunction]] under a unique name. Replaces already existing
     * user-defined functions under this name.
     */
@@ -254,6 +289,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     checkForInstantiation(function.getClass)
 
     // register in Table API
+
     functionCatalog.registerFunction(name, function.getClass)
 
     // register in SQL API
@@ -341,7 +377,7 @@ abstract class TableEnvironment(val config: TableConfig) {
   protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
 
     if (isRegistered(name)) {
-      tables.add(name, table)
+      rootSchema.add(name, table)
     } else {
       throw new TableException(s"Table \'$name\' is not registered.")
     }
@@ -350,19 +386,55 @@ abstract class TableEnvironment(val config: TableConfig) {
   /**
     * Scans a registered table and returns the resulting [[Table]].
     *
-    * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
+    * A table to scan must be registered in the TableEnvironment. It can be either directly
+    * registered as DataStream, DataSet, or Table or as member of an [[ExternalCatalog]].
+    *
+    * Examples:
     *
-    * @param tableName The name of the table to scan.
-    * @throws ValidationException if no table is registered under the given name.
-    * @return The scanned table.
+    * - Scanning a directly registered table
+    * {{{
+    *   val tab: Table = tableEnv.scan("tableName")
+    * }}}
+    *
+    * - Scanning a table from a registered catalog
+    * {{{
+    *   val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
+    * }}}
+    *
+    * @param tablePath The path of the table to scan.
+    * @throws TableException if no table is found using the given table path.
+    * @return The resulting [[Table]].
     */
-  @throws[ValidationException]
-  def scan(tableName: String): Table = {
-    if (isRegistered(tableName)) {
-      new Table(this, CatalogNode(tableName, getRowType(tableName)))
-    } else {
-      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
+  @throws[TableException]
+  @varargs
+  def scan(tablePath: String*): Table = {
+    scanInternal(tablePath.toArray)
+  }
+
+  @throws[TableException]
+  private def scanInternal(tablePath: Array[String]): Table = {
+    require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
+    val schemaPaths = tablePath.slice(0, tablePath.length - 1)
+    val schema = getSchema(schemaPaths)
+    if (schema != null) {
+      val tableName = tablePath(tablePath.length - 1)
+      val table = schema.getTable(tableName)
+      if (table != null) {
+        return new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory)))
+      }
+    }
+    throw new TableException(s"Table \'${tablePath.mkString(".")}\' was not found.")
+  }
+
+  private def getSchema(schemaPath: Array[String]): SchemaPlus = {
+    var schema = rootSchema
+    for (schemaName <- schemaPath) {
+      schema = schema.getSubSchema(schemaName)
+      if (schema == null) {
+        return schema
+      }
     }
+    schema
   }
 
   /**
@@ -416,7 +488,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       throw new TableException(s"Table \'$name\' already exists. " +
         s"Please, choose a different name.")
     } else {
-      tables.add(name, table)
+      rootSchema.add(name, table)
     }
   }
 
@@ -434,11 +506,11 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @return true, if a table is registered under the name, false otherwise.
     */
   protected def isRegistered(name: String): Boolean = {
-    tables.getTableNames.contains(name)
+    rootSchema.getTableNames.contains(name)
   }
 
   protected def getRowType(name: String): RelDataType = {
-    tables.getTable(name).getRowType(typeFactory)
+    rootSchema.getTable(name).getRowType(typeFactory)
   }
 
   /** Returns a unique temporary attribute name. */

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 8632436..760cf75 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -75,34 +75,34 @@ object ValidationException {
 case class UnresolvedException(msg: String) extends RuntimeException(msg)
 
 /**
-  * Exception for operation on a nonexistent table
+  * Exception for an operation on a nonexistent table
   *
   * @param db    database name
   * @param table table name
-  * @param cause
+  * @param cause the cause
   */
 case class TableNotExistException(
     db: String,
     table: String,
     cause: Throwable)
-    extends RuntimeException(s"table $db.$table does not exist!", cause) {
+    extends RuntimeException(s"Table $db.$table does not exist.", cause) {
 
   def this(db: String, table: String) = this(db, table, null)
 
 }
 
 /**
-  * Exception for adding an already existed table
+  * Exception for adding an already existent table
   *
   * @param db    database name
   * @param table table name
-  * @param cause
+  * @param cause the cause
   */
 case class TableAlreadyExistException(
     db: String,
     table: String,
     cause: Throwable)
-    extends RuntimeException(s"table $db.$table already exists!", cause) {
+    extends RuntimeException(s"Table $db.$table already exists.", cause) {
 
   def this(db: String, table: String) = this(db, table, null)
 
@@ -112,56 +112,84 @@ case class TableAlreadyExistException(
   * Exception for operation on a nonexistent database
   *
   * @param db database name
-  * @param cause
+  * @param cause the cause
   */
 case class DatabaseNotExistException(
     db: String,
     cause: Throwable)
-    extends RuntimeException(s"database $db does not exist!", cause) {
+    extends RuntimeException(s"Database $db does not exist.", cause) {
 
   def this(db: String) = this(db, null)
 }
 
 /**
-  * Exception for adding an already existed database
+  * Exception for adding an already existent database
   *
   * @param db database name
-  * @param cause
+  * @param cause the cause
   */
 case class DatabaseAlreadyExistException(
     db: String,
     cause: Throwable)
-    extends RuntimeException(s"database $db already exists!", cause) {
+    extends RuntimeException(s"Database $db already exists.", cause) {
 
   def this(db: String) = this(db, null)
 }
 
 /**
-  * Exception for does not find any matched [[TableSourceConverter]] for a specified table type
+  * Exception for not finding a [[TableSourceConverter]] for a given table type.
   *
   * @param tableType table type
-  * @param cause
+  * @param cause the cause
   */
 case class NoMatchedTableSourceConverterException(
     tableType: String,
     cause: Throwable)
-    extends RuntimeException(s"find no table source converter matched table type $tableType!",
+    extends RuntimeException(s"Could not find a TableSourceConverter for table type $tableType.",
       cause) {
 
   def this(tableType: String) = this(tableType, null)
 }
 
 /**
-  * Exception for find more than one matched [[TableSourceConverter]] for a specified table type
+  * Exception for finding more than one [[TableSourceConverter]] for a given table type.
   *
   * @param tableType table type
-  * @param cause
+  * @param cause the cause
   */
 case class AmbiguousTableSourceConverterException(
     tableType: String,
     cause: Throwable)
-    extends RuntimeException(s"more than one table source converter matched table type $tableType!",
+    extends RuntimeException(s"More than one TableSourceConverter for table type $tableType.",
       cause) {
 
   def this(tableType: String) = this(tableType, null)
 }
+
+/**
+  * Exception for operation on a nonexistent external catalog
+  *
+  * @param catalogName external catalog name
+  * @param cause the cause
+  */
+case class ExternalCatalogNotExistException(
+    catalogName: String,
+    cause: Throwable)
+    extends RuntimeException(s"External catalog $catalogName does not exist.", cause) {
+
+  def this(catalogName: String) = this(catalogName, null)
+}
+
+/**
+  * Exception for adding an already existent external catalog
+  *
+  * @param catalogName external catalog name
+  * @param cause the cause
+  */
+case class ExternalCatalogAlreadyExistException(
+    catalogName: String,
+    cause: Throwable)
+    extends RuntimeException(s"External catalog $catalogName already exists.", cause) {
+
+  def this(catalogName: String) = this(catalogName, null)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index e3ed96e..8e010fa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -136,20 +136,18 @@ class ExternalCatalogSchema(
 object ExternalCatalogSchema {
 
   /**
-    * Creates a FlinkExternalCatalogSchema.
+    * Registers an external catalog in a Calcite schema.
     *
-    * @param parentSchema              Parent schema
-    * @param externalCatalogIdentifier External catalog identifier
-    * @param externalCatalog           External catalog object
-    * @return Created schema
+    * @param parentSchema              Parent schema into which the catalog is registered
+    * @param externalCatalogIdentifier Identifier of the external catalog
+    * @param externalCatalog           The external catalog to register
     */
-  def create(
+  def registerCatalog(
       parentSchema: SchemaPlus,
       externalCatalogIdentifier: String,
-      externalCatalog: ExternalCatalog): ExternalCatalogSchema = {
+      externalCatalog: ExternalCatalog): Unit = {
     val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, externalCatalog)
     val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
     newSchema.registerSubSchemas(schemaPlusOfNewSchema)
-    newSchema
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 1b5eafb..559bd75 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -511,7 +511,7 @@ case class Join(
 }
 
 case class CatalogNode(
-    tableName: String,
+    tablePath: Array[String],
     rowType: RelDataType) extends LeafNode {
 
   val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field =>
@@ -519,7 +519,7 @@ case class CatalogNode(
   }
 
   override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
-    relBuilder.scan(tableName)
+    relBuilder.scan(tablePath.toIterable.asJava)
   }
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = this

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
new file mode 100644
index 0000000..696468d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Test for external catalog query plan.
+  */
+class ExternalCatalogTest extends TableTestBase {
+  private val table1Path: Array[String] = Array("test", "db1", "tb1")
+  private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
+  private val table2Path: Array[String] = Array("test", "db2", "tb2")
+  private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
+
+  @Test
+  def testBatchTableApi(): Unit = {
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val table1 = tEnv.scan("test", "db1", "tb1")
+    val table2 = tEnv.scan("test", "db2", "tb2")
+    val result = table2
+        .select('d * 2, 'e, 'g.upperCase())
+        .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+      ),
+      term("union", "_c0", "e", "_c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchSQL(): Unit = {
+    val util = batchTestUtil()
+
+    util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+        "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+        term("where", "<(d, 3)")),
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS EXPR$0", "b", "c")
+      ),
+      term("union", "EXPR$0", "e", "g"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testStreamTableApi(): Unit = {
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val table1 = tEnv.scan("test", "db1", "tb1")
+    val table2 = tEnv.scan("test", "db2", "tb2")
+
+    val result = table2.where("d < 3")
+        .select('d * 2, 'e, 'g.upperCase())
+        .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
+        term("where", "<(d, 3)")
+      ),
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+      ),
+      term("union", "_c0", "e", "_c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamSQL(): Unit = {
+    val util = streamTestUtil()
+
+    util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+        "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+        term("where", "<(d, 3)")),
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS EXPR$0", "b", "c")
+      ),
+      term("union", "EXPR$0", "e", "g"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+    s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+        s"fields=[${fields.mkString(", ")}])"
+  }
+
+  def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+    s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+        s"fields=[${fields.mkString(", ")}])"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index 6ffa8c6..b780a3f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -37,7 +37,7 @@ import scala.collection.JavaConverters._
 class ExternalCatalogSchemaTest {
 
   private val schemaName: String = "test"
-  private var externalCatalogSchema: ExternalCatalogSchema = _
+  private var externalCatalogSchema: SchemaPlus = _
   private var calciteCatalogReader: CalciteCatalogReader = _
   private val db = "db1"
   private val tb = "tb1"
@@ -46,7 +46,8 @@ class ExternalCatalogSchemaTest {
   def setUp(): Unit = {
     val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
     val catalog = CommonTestData.getInMemoryTestCatalog
-    externalCatalogSchema = ExternalCatalogSchema.create(rootSchemaPlus, schemaName, catalog)
+    ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
+    externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
     calciteCatalogReader = new CalciteCatalogReader(
       CalciteSchema.from(rootSchemaPlus),


[5/5] flink git commit: [hotfix] [table] Improved code documentation for external catalog.

Posted by fh...@apache.org.
[hotfix] [table] Improved code documentation for external catalog.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f97deaa9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f97deaa9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f97deaa9

Branch: refs/heads/master
Commit: f97deaa9683bf1868ecf104c73b997ede63e8856
Parents: 135a57c
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Mar 23 22:18:37 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100

----------------------------------------------------------------------
 .../flink/table/annotation/TableType.java       |  6 +-
 .../table/catalog/CrudExternalCatalog.scala     | 86 ++++++++++----------
 .../flink/table/catalog/ExternalCatalog.scala   | 40 ++++-----
 .../table/catalog/ExternalCatalogDatabase.scala |  6 +-
 .../table/catalog/ExternalCatalogTable.scala    | 24 +++---
 .../table/catalog/TableSourceConverter.scala    | 23 +++---
 6 files changed, 94 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
index 1cebe53..3845eae 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
@@ -27,7 +27,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * A {@link TableSourceConverter} with this annotation bind the converter with table type.
+ * Annotates a table type of a {@link TableSourceConverter}.
  */
 @Documented
 @Target(ElementType.TYPE)
@@ -36,9 +36,9 @@ import java.lang.annotation.Target;
 public @interface TableType {
 
 	/**
-	 * Specifies the external catalog table type of {@link TableSourceConverter}.
+	 * Returns the table type of a {@link TableSourceConverter}.
 	 *
-	 * @return the external catalog table type of {@link TableSourceConverter}.
+	 * @return The table type of the {@link TableSourceConverter}.
 	 */
 	String value();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index d93f140..fcefa45 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -21,88 +21,86 @@ package org.apache.flink.table.catalog
 import org.apache.flink.table.api._
 
 /**
-  * This class is responsible for interact with external catalog.
-  * Its main responsibilities including:
-  * <ul>
-  * <li> create/drop/alter database or tables for DDL operations
-  * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
-  * </ul>
+  * The CrudExternalCatalog provides methods to create, drop, and alter databases or tables.
   */
 trait CrudExternalCatalog extends ExternalCatalog {
 
   /**
-    * Adds table into external Catalog
+    * Adds a table to the catalog.
     *
-    * @param table          description of table which to create
-    * @param ignoreIfExists if table already exists in the catalog, not throw exception and leave
-    *                       the existed table if ignoreIfExists is true;
-    *                       else throw a TableAlreadyExistException.
-    * @throws DatabaseNotExistException  if database does not exist in the catalog yet
-    * @throws TableAlreadyExistException if table already exists in the catalog and
-    *                                    ignoreIfExists is false
+    * @param table          Description of the table to add
+    * @param ignoreIfExists Flag to specify behavior if a table with the given name already exists:
+    *                       if set to false, it throws a TableAlreadyExistException,
+    *                       if set to true, nothing happens.
+    * @throws DatabaseNotExistException  thrown if database does not exist
+    * @throws TableAlreadyExistException thrown if table already exists and ignoreIfExists is false
     */
   @throws[DatabaseNotExistException]
   @throws[TableAlreadyExistException]
   def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
 
   /**
-    * Deletes table from external Catalog
+    * Deletes table from a database of the catalog.
     *
-    * @param dbName            database name
-    * @param tableName         table name
-    * @param ignoreIfNotExists if table not exist yet, not throw exception if ignoreIfNotExists is
-    *                          true; else throw TableNotExistException
-    * @throws DatabaseNotExistException if database does not exist in the catalog yet
-    * @throws TableNotExistException    if table does not exist in the catalog yet
+    * @param dbName            Name of the database
+    * @param tableName         Name of the table
+    * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+    * @throws TableNotExistException    thrown if the table does not exist in the catalog
     */
   @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
   def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
 
   /**
-    * Modifies an existing table in the external catalog
+    * Modifies an existing table in the catalog.
     *
-    * @param table             description of table which to modify
-    * @param ignoreIfNotExists if the table not exist yet, not throw exception if ignoreIfNotExists
-    *                          is true; else throw TableNotExistException
-    * @throws DatabaseNotExistException if database does not exist in the catalog yet
-    * @throws TableNotExistException    if table does not exist in the catalog yet
+    * @param table             New description of the table to update
+    * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+    * @throws TableNotExistException    thrown if the table does not exist in the catalog
     */
   @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
   def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
 
   /**
-    * Adds database into external Catalog
+    * Adds a database to the catalog.
     *
-    * @param db             description of database which to create
-    * @param ignoreIfExists if database already exists in the catalog, not throw exception and leave
-    *                       the existed database if ignoreIfExists is true;
-    *                       else throw a DatabaseAlreadyExistException.
-    * @throws DatabaseAlreadyExistException if database already exists in the catalog and
-    *                                       ignoreIfExists is false
+    * @param db             Description of the database to create
+    * @param ignoreIfExists Flag to specify behavior if a database with the given name already
+    *                       exists: if set to false, it throws a DatabaseAlreadyExistException,
+    *                       if set to true, nothing happens.
+    * @throws DatabaseAlreadyExistException thrown if the database does already exist in the catalog
+    *                                       and ignoreIfExists is false
     */
   @throws[DatabaseAlreadyExistException]
   def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit
 
   /**
-    * Deletes database from external Catalog
+    * Deletes a database from the catalog.
     *
-    * @param dbName            database name
-    * @param ignoreIfNotExists if database not exist yet, not throw exception if ignoreIfNotExists
-    *                          is true; else throw DatabaseNotExistException
-    * @throws DatabaseNotExistException if database does not exist in the catalog yet
+    * @param dbName            Name of the database.
+    * @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
     */
   @throws[DatabaseNotExistException]
   def dropDatabase(dbName: String, ignoreIfNotExists: Boolean): Unit
 
   /**
-    * Modifies existed database into external Catalog
+    * Modifies an existing database in the catalog.
     *
-    * @param db                description of database which to modify
-    * @param ignoreIfNotExists if database not exist yet, not throw exception if ignoreIfNotExists
-    *                          is true; else throw DatabaseNotExistException
-    * @throws DatabaseNotExistException if database does not exist in the catalog yet
+    * @param db                New description of the database to update
+    * @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
     */
   @throws[DatabaseNotExistException]
   def alterDatabase(db: ExternalCatalogDatabase, ignoreIfNotExists: Boolean): Unit

http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
index 58b62c7..00a35e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
@@ -23,49 +23,51 @@ import java.util.{List => JList}
 import org.apache.flink.table.api._
 
 /**
-  * This class is responsible for read table/database from external catalog.
-  * Its main responsibilities is provide tables for calcite catalog, it looks up databases or tables
-  * in the external catalog.
+  * An [[ExternalCatalog]] is the connector between an external database catalog and Flink's
+  * Table API.
+  *
+  * It provides information about databases and tables such as names, schema, statistics, and
+  * access information.
   */
 trait ExternalCatalog {
 
   /**
-    * Gets table from external Catalog
+    * Get a table from the catalog
     *
-    * @param dbName    database name
-    * @param tableName table name
-    * @throws DatabaseNotExistException if database does not exist in the catalog yet
-    * @throws TableNotExistException    if table does not exist in the catalog yet
-    * @return found table
+    * @param dbName    The name of the table's database.
+    * @param tableName The name of the table.
+    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog.
+    * @throws TableNotExistException    thrown if the table does not exist in the catalog.
+    * @return the requested table
     */
   @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
   def getTable(dbName: String, tableName: String): ExternalCatalogTable
 
   /**
-    * Gets the table name lists from current external Catalog
+    * Get a list of all table names of a database in the catalog.
     *
-    * @param dbName database name
-    * @throws DatabaseNotExistException if database does not exist in the catalog yet
-    * @return lists of table name
+    * @param dbName The name of the database.
+    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+    * @return The list of table names
     */
   @throws[DatabaseNotExistException]
   def listTables(dbName: String): JList[String]
 
   /**
-    * Gets database from external Catalog
+    * Gets a database from the catalog.
     *
-    * @param dbName database name
-    * @throws DatabaseNotExistException if database does not exist in the catalog yet
-    * @return found database
+    * @param dbName The name of the database.
+    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+    * @return The requested database
     */
   @throws[DatabaseNotExistException]
   def getDatabase(dbName: String): ExternalCatalogDatabase
 
   /**
-    * Gets the database name lists from current external Catalog
+    * Gets a list of all databases in the catalog.
     *
-    * @return list of database names
+    * @return The list of database names
     */
   def listDatabases(): JList[String]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
index c2a4702..99ab2eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.catalog
 import java.util.{HashMap => JHashMap, Map => JMap}
 
 /**
-  * Database definition of the external catalog.
+  * Defines a database in an [[ExternalCatalog]].
   *
-  * @param dbName     database name
-  * @param properties database properties
+  * @param dbName     The name of the database
+  * @param properties The properties of the database
   */
 case class ExternalCatalogDatabase(
     dbName: String,

http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 893cbb3..4fdab66 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -26,16 +26,16 @@ import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.plan.stats.TableStats
 
 /**
-  * Table definition of the external catalog.
+  * Defines a table in an [[ExternalCatalog]].
   *
-  * @param identifier           identifier of external catalog table, including dbName and tableName
-  * @param tableType            type of external catalog table, e.g csv, hbase, kafka
-  * @param schema               schema of table data, including column names and column types
-  * @param properties           properties of external catalog table
-  * @param stats                statistics of external catalog table
-  * @param comment              comment of external catalog table
-  * @param createTime           create time of external catalog table
-  * @param lastAccessTime       last access time of of external catalog table
+  * @param identifier           Identifier of the table (database name and table name)
+  * @param tableType            Table type, e.g csv, hbase, kafka
+  * @param schema               Schema of the table (column names and types)
+  * @param properties           Properties of the table
+  * @param stats                Statistics of the table
+  * @param comment              Comment of the table
+  * @param createTime           Create timestamp of the table
+  * @param lastAccessTime       Timestamp of last access of the table
   */
 case class ExternalCatalogTable(
     identifier: TableIdentifier,
@@ -48,10 +48,10 @@ case class ExternalCatalogTable(
     lastAccessTime: JLong = -1L)
 
 /**
-  * Identifier of external catalog table
+  * Identifier for a catalog table.
   *
-  * @param database database name
-  * @param table    table name
+  * @param database Database name
+  * @param table    Table name
   */
 case class TableIdentifier(
     database: String,

http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
index 13e54a6..ca6df9a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
@@ -22,26 +22,29 @@ import java.util.{Set => JSet}
 
 import org.apache.flink.table.sources.TableSource
 
-/** Defines a converter used to convert [[org.apache.flink.table.sources.TableSource]] to
-  * or from [[ExternalCatalogTable]].
+/** Creates a [[org.apache.flink.table.sources.TableSource]] from the properties of an
+  * [[ExternalCatalogTable]].
   *
-  * @tparam T The tableSource which to do convert operation on.
+  * The [[org.apache.flink.table.annotation.TableType]] annotation defines which type of external
+  * table is supported.
+  *
+  * @tparam T The [[TableSource]] to be created by this converter.
   */
 trait TableSourceConverter[T <: TableSource[_]] {
 
   /**
-    * Defines the required properties that must exists in the properties of an ExternalCatalogTable
-    * to ensure the input ExternalCatalogTable is compatible with the requirements of
-    * current converter.
-    * @return the required properties.
+    * Defines the properties that need to be provided by the [[ExternalCatalogTable]] to create
+    * the [[TableSource]].
+    *
+    * @return The required properties.
     */
   def requiredProperties: JSet[String]
 
   /**
-    * Converts the input external catalog table instance to a tableSource instance.
+    * Creates a [[TableSource]] for the given [[ExternalCatalogTable]].
     *
-    * @param externalCatalogTable input external catalog table instance to convert
-    * @return converted tableSource instance from input external catalog table.
+    * @param externalCatalogTable ExternalCatalogTable to create a TableSource from.
+    * @return The created TableSource.
     */
   def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): T
 


[2/5] flink git commit: [FLINK-6089] [table] Add decoration phase for stream queries to rewrite plans after the cost-based optimization.

Posted by fh...@apache.org.
[FLINK-6089] [table] Add decoration phase for stream queries to rewrite plans after the cost-based optimization.

This closes #3564.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6949c8c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6949c8c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6949c8c7

Branch: refs/heads/master
Commit: 6949c8c79c41344023df08dde2936f06daa00e0d
Parents: f97deaa
Author: hequn.chq <he...@alibaba-inc.com>
Authored: Thu Mar 16 11:11:17 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 38 ++++++++-
 .../flink/table/calcite/CalciteConfig.scala     | 89 +++++++++++++++++---
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  9 +-
 .../flink/table/CalciteConfigBuilderTest.scala  | 69 +++++++++++++++
 4 files changed, 188 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d927c3a..225a675 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.RuleSet
+import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -39,6 +39,8 @@ import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.types.Row
 
+import _root_.scala.collection.JavaConverters._
+
 /**
   * The base class for stream TableEnvironments.
   *
@@ -211,6 +213,26 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Returns the decoration rule set for this environment
+    * including a custom RuleSet configuration.
+    */
+  protected def getDecoRuleSet: RuleSet = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getDecoRuleSet match {
+
+      case None =>
+        getBuiltInDecoRuleSet
+
+      case Some(ruleSet) =>
+        if (calciteConfig.replacesDecoRuleSet) {
+          ruleSet
+        } else {
+          RuleSets.ofList((getBuiltInDecoRuleSet.asScala ++ ruleSet.asScala).asJava)
+        }
+    }
+  }
+
+  /**
     * Returns the built-in normalization rules that are defined by the environment.
     */
   protected def getBuiltInNormRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_NORM_RULES
@@ -221,6 +243,11 @@ abstract class StreamTableEnvironment(
   protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
 
   /**
+    * Returns the built-in decoration rules that are defined by the environment.
+    */
+  protected def getBuiltInDecoRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_DECO_RULES
+
+  /**
     * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
     * @param relNode The root node of the relational expression tree.
@@ -248,7 +275,14 @@ abstract class StreamTableEnvironment(
       normalizedPlan
     }
 
-    optimizedPlan
+    // 4. decorate the optimized plan
+    val decoRuleSet = getDecoRuleSet
+    val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
+      runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, optimizedPlan, optimizedPlan.getTraitSet)
+    } else {
+      optimizedPlan
+    }
+    decoratedPlan
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
index 65a61b2..ba8df81 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -31,15 +31,36 @@ import scala.collection.JavaConverters._
   * Builder for creating a Calcite configuration.
   */
 class CalciteConfigBuilder {
+
+  /**
+    * Defines the normalization rule set. Normalization rules are dedicated for rewriting
+    * predicated logical plan before volcano optimization.
+    */
   private var replaceNormRules: Boolean = false
   private var normRuleSets: List[RuleSet] = Nil
 
+  /**
+    * Defines the optimization rule set. Optimization rules are used during volcano optimization.
+    */
   private var replaceOptRules: Boolean = false
   private var optRuleSets: List[RuleSet] = Nil
 
+  /**
+    * Defines the decoration rule set. Decoration rules are dedicated for rewriting predicated
+    * logical plan after volcano optimization.
+    */
+  private var replaceDecoRules: Boolean = false
+  private var decoRuleSets: List[RuleSet] = Nil
+
+  /**
+    * Defines the SQL operator tables.
+    */
   private var replaceOperatorTable: Boolean = false
   private var operatorTables: List[SqlOperatorTable] = Nil
 
+  /**
+    * Defines a SQL parser configuration.
+    */
   private var replaceSqlParserConfig: Option[SqlParser.Config] = None
 
   /**
@@ -81,6 +102,32 @@ class CalciteConfigBuilder {
   }
 
   /**
+    * Replaces the built-in decoration rule set with the given rule set.
+    *
+    * The decoration rules are applied after the cost-based optimization phase.
+    * The decoration phase allows to rewrite the optimized plan and is not cost-based.
+    *
+    */
+  def replaceDecoRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(replaceRuleSet)
+    decoRuleSets = List(replaceRuleSet)
+    replaceDecoRules = true
+    this
+  }
+
+  /**
+    * Appends the given decoration rule set to the built-in rule set.
+    *
+    * The decoration rules are applied after the cost-based optimization phase.
+    * The decoration phase allows to rewrite the optimized plan and is not cost-based.
+    */
+  def addDecoRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(addedRuleSet)
+    decoRuleSets = addedRuleSet :: decoRuleSets
+    this
+  }
+
+  /**
     * Replaces the built-in SQL operator table with the given table.
     */
   def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
@@ -113,35 +160,39 @@ class CalciteConfigBuilder {
     val replacesNormRuleSet: Boolean,
     val getOptRuleSet: Option[RuleSet],
     val replacesOptRuleSet: Boolean,
+    val getDecoRuleSet: Option[RuleSet],
+    val replacesDecoRuleSet: Boolean,
     val getSqlOperatorTable: Option[SqlOperatorTable],
     val replacesSqlOperatorTable: Boolean,
     val getSqlParserConfig: Option[SqlParser.Config])
     extends CalciteConfig
 
+
   /**
-    * Builds a new [[CalciteConfig]].
+    * Convert the [[RuleSet]] List to [[Option]] type
     */
-  def build(): CalciteConfig = new CalciteConfigImpl(
-    normRuleSets match {
+  private def getRuleSet(inputRuleSet: List[RuleSet]): Option[RuleSet] = {
+    inputRuleSet match {
       case Nil => None
       case h :: Nil => Some(h)
       case _ =>
         // concat rule sets
         val concatRules =
-          normRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
+          inputRuleSet.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
         Some(RuleSets.ofList(concatRules.asJava))
-    },
+    }
+  }
+
+  /**
+    * Builds a new [[CalciteConfig]].
+    */
+  def build(): CalciteConfig = new CalciteConfigImpl(
+    getRuleSet(normRuleSets),
     replaceNormRules,
-    optRuleSets match {
-      case Nil => None
-      case h :: Nil => Some(h)
-      case _ =>
-        // concat rule sets
-        val concatRules =
-          optRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
-        Some(RuleSets.ofList(concatRules.asJava))
-    },
+    getRuleSet(optRuleSets),
     replaceOptRules,
+    getRuleSet(decoRuleSets),
+    replaceDecoRules,
     operatorTables match {
       case Nil => None
       case h :: Nil => Some(h)
@@ -179,6 +230,16 @@ trait CalciteConfig {
   def getOptRuleSet: Option[RuleSet]
 
   /**
+    * Returns whether this configuration replaces the built-in decoration rule set.
+    */
+  def replacesDecoRuleSet: Boolean
+
+  /**
+    * Returns a custom decoration rule set.
+    */
+  def getDecoRuleSet: Option[RuleSet]
+
+  /**
     * Returns whether this configuration replaces the built-in SQL operator table.
     */
   def replacesSqlOperatorTable: Boolean

http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 952ee34..1301c8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -23,7 +23,6 @@ import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.table.calcite.rules.{FlinkAggregateExpandDistinctAggregatesRule, FlinkAggregateJoinTransposeRule}
 import org.apache.flink.table.plan.rules.dataSet._
 import org.apache.flink.table.plan.rules.datastream._
-import org.apache.flink.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
 
 object FlinkRuleSets {
 
@@ -186,4 +185,12 @@ object FlinkRuleSets {
       PushFilterIntoStreamTableSourceScanRule.INSTANCE
   )
 
+  /**
+    * RuleSet to decorate plans for stream / DataStream execution
+    */
+  val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
+    // rules
+
+  )
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
index 6c07e28..d0de8fa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -39,6 +39,9 @@ class CalciteConfigBuilderTest {
 
     assertFalse(cc.replacesOptRuleSet)
     assertFalse(cc.getOptRuleSet.isDefined)
+
+    assertFalse(cc.replacesDecoRuleSet)
+    assertFalse(cc.getDecoRuleSet.isDefined)
   }
 
   @Test
@@ -47,6 +50,7 @@ class CalciteConfigBuilderTest {
     val cc: CalciteConfig = new CalciteConfigBuilder()
       .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
       .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
       .build()
 
     assertFalse(cc.replacesNormRuleSet)
@@ -54,6 +58,9 @@ class CalciteConfigBuilderTest {
 
     assertTrue(cc.replacesOptRuleSet)
     assertTrue(cc.getOptRuleSet.isDefined)
+
+    assertTrue(cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
   }
 
   @Test
@@ -181,6 +188,68 @@ class CalciteConfigBuilderTest {
   }
 
   @Test
+  def testReplaceDecorationRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .build()
+
+    assertEquals(true, cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
+    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+    assertEquals(1, cSet.size)
+    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+  }
+
+  @Test
+  def testReplaceDecorationAddRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE))
+      .build()
+
+    assertEquals(true, cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
+    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+    assertEquals(2, cSet.size)
+    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+    assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+  }
+
+  @Test
+  def testAddDecorationRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .build()
+
+    assertEquals(false, cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
+    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+    assertEquals(1, cSet.size)
+    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+  }
+
+  @Test
+  def testAddAddDecorationRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE,
+                                      ReduceExpressionsRule.CALC_INSTANCE))
+      .build()
+
+    assertEquals(false, cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
+    val cList = cc.getDecoRuleSet.get.iterator().asScala.toList
+    assertEquals(3, cList.size)
+    assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE)
+    assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE)
+    assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE)
+  }
+
+  @Test
   def testDefaultOperatorTable(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()