You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/24 19:23:35 UTC
[1/5] flink git commit: [FLINK-5658] [table] Add event-time OVER
ROWS/RANGE UNBOUNDED PRECEDING aggregation to SQL.
Repository: flink
Updated Branches:
refs/heads/master 976e03c1e -> fe2c61a28
[FLINK-5658] [table] Add event-time OVER ROWS/RANGE UNBOUNDED PRECEDING aggregation to SQL.
This closes #3386.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe2c61a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe2c61a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe2c61a2
Branch: refs/heads/master
Commit: fe2c61a28e6a5300b2cf4c1e50ee884b51ef42c9
Parents: 7a9d39f
Author: hongyuhong 00223286 <ho...@huawei.com>
Authored: Fri Mar 24 09:31:59 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100
----------------------------------------------------------------------
.../datastream/DataStreamOverAggregate.scala | 62 +++--
.../table/runtime/aggregate/AggregateUtil.scala | 70 +++--
.../UnboundedEventTimeOverProcessFunction.scala | 224 ++++++++++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 263 ++++++++++++++++++-
.../scala/stream/sql/WindowAggregateTest.scala | 64 ++++-
5 files changed, 634 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 547c875..3dd7ee2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -104,7 +104,7 @@ class DataStreamOverAggregate(
case _: ProcTimeType =>
// proc-time OVER window
if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
- // non-bounded OVER window
+ // unbounded preceding OVER window
createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
} else if (
overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
@@ -126,23 +126,15 @@ class DataStreamOverAggregate(
}
case _: RowTimeType =>
// row-time OVER window
- if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
- // non-bounded OVER window
- if (overWindow.isRows) {
- // ROWS clause unbounded OVER window
- throw new TableException(
- "ROWS clause unbounded row-time OVER window no supported yet.")
- } else {
- // RANGE clause unbounded OVER window
- throw new TableException(
- "RANGE clause unbounded row-time OVER window no supported yet.")
- }
- } else if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
- overWindow.upperBound.isCurrentRow) {
+ if (overWindow.lowerBound.isPreceding &&
+ overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
+ // unbounded preceding OVER window
+ createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
+ } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
// bounded OVER window
if (overWindow.isRows) {
// ROWS clause bounded OVER window
- createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, true)
+ createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, isRowTimeType = true)
} else {
// RANGE clause bounded OVER window
throw new TableException(
@@ -187,7 +179,7 @@ class DataStreamOverAggregate(
val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
namedAggregates,
inputType,
- false)
+ isPartitioned = false)
inputDS
.process(processFunction).setParallelism(1).setMaxParallelism(1)
@@ -205,7 +197,6 @@ class DataStreamOverAggregate(
val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
- val inputFields = (0 until inputType.getFieldCount).toArray
val precedingOffset =
getLowerBoundary(logicWindow, overWindow, getInput()) + 1
@@ -216,7 +207,6 @@ class DataStreamOverAggregate(
val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction(
namedAggregates,
inputType,
- inputFields,
precedingOffset,
isRowTimeType
)
@@ -244,6 +234,42 @@ class DataStreamOverAggregate(
result
}
+ def createUnboundedAndCurrentRowEventTimeOverWindow(
+ inputDS: DataStream[Row]): DataStream[Row] = {
+
+ val overWindow: Group = logicWindow.groups.get(0)
+ val partitionKeys: Array[Int] = overWindow.keys.toArray
+ val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+ // get the output types
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+ val processFunction = AggregateUtil.createUnboundedEventTimeOverProcessFunction(
+ namedAggregates,
+ inputType)
+
+ val result: DataStream[Row] =
+ // partitioned aggregation
+ if (partitionKeys.nonEmpty) {
+ inputDS.keyBy(partitionKeys: _*)
+ .process(processFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ // global non-partitioned aggregation
+ else {
+ inputDS.keyBy(new NullByteKeySelector[Row])
+ .process(processFunction)
+ .setParallelism(1)
+ .setMaxParallelism(1)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ result
+ }
+
private def generateNamedAggregates: Seq[CalcitePair[AggregateCall, String]] = {
val overWindow: Group = logicWindow.groups.get(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 0084ee5..fdac692 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -104,7 +104,6 @@ object AggregateUtil {
private[flink] def createRowsClauseBoundedOverProcessFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType,
- inputFields: Array[Int],
precedingOffset: Long,
isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
@@ -114,26 +113,49 @@ object AggregateUtil {
inputType,
needRetraction = true)
- val aggregationStateType: RowTypeInfo =
- createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+ val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+ val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
- val inputRowType: RowTypeInfo =
- createDataSetAggregateBufferDataType(inputFields, Array(), inputType)
+ if (isRowTimeType) {
+ new RowsClauseBoundedOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ inputRowType,
+ precedingOffset
+ )
+ } else {
+ throw TableException(
+ "Bounded partitioned proc-time OVER aggregation is not supported yet.")
+ }
+ }
- val processFunction = if (isRowTimeType) {
- new RowsClauseBoundedOverProcessFunction(
- aggregates,
- aggFields,
- inputType.getFieldCount,
- aggregationStateType,
- inputRowType,
- precedingOffset
- )
- } else {
- throw TableException(
- "Bounded partitioned proc-time OVER aggregation is not supported yet.")
- }
- processFunction
+ /**
+ * Create an [[ProcessFunction]] to evaluate final aggregate value.
+ *
+ * @param namedAggregates List of calls to aggregate functions and their output field names
+ * @param inputType Input row type
+ * @return [[UnboundedEventTimeOverProcessFunction]]
+ */
+ private[flink] def createUnboundedEventTimeOverProcessFunction(
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
+
+ val (aggFields, aggregates) =
+ transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ needRetraction = false)
+
+ val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+
+ new UnboundedEventTimeOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ FlinkTypeFactory.toInternalRowTypeInfo(inputType))
}
/**
@@ -595,7 +617,7 @@ object AggregateUtil {
// compute preaggregation type
val preAggFieldTypes = gkeyInFields
.map(inputType.getFieldList.get(_).getType)
- .map(FlinkTypeFactory.toTypeInfo) ++ createAccumulatorType(inputType, aggregates)
+ .map(FlinkTypeFactory.toTypeInfo) ++ createAccumulatorType(aggregates)
val preAggRowType = new RowTypeInfo(preAggFieldTypes: _*)
(
@@ -701,7 +723,7 @@ object AggregateUtil {
val aggResultTypes = namedAggregates.map(a => FlinkTypeFactory.toTypeInfo(a.left.getType))
- val accumulatorRowType = createAccumulatorRowType(inputType, aggregates)
+ val accumulatorRowType = createAccumulatorRowType(aggregates)
val aggResultRowType = new RowTypeInfo(aggResultTypes: _*)
val aggFunction = new AggregateAggFunction(aggregates, aggFields)
@@ -1029,7 +1051,6 @@ object AggregateUtil {
}
private def createAccumulatorType(
- inputType: RelDataType,
aggregates: Array[TableAggregateFunction[_]]): Seq[TypeInformation[_]] = {
val aggTypes: Seq[TypeInformation[_]] =
@@ -1068,7 +1089,7 @@ object AggregateUtil {
.map(FlinkTypeFactory.toTypeInfo)
// get all field data types of all intermediate aggregates
- val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(inputType, aggregates)
+ val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(aggregates)
// concat group key types, aggregation types, and window key types
val allFieldTypes: Seq[TypeInformation[_]] = windowKeyTypes match {
@@ -1079,10 +1100,9 @@ object AggregateUtil {
}
private def createAccumulatorRowType(
- inputType: RelDataType,
aggregates: Array[TableAggregateFunction[_]]): RowTypeInfo = {
- val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(inputType, aggregates)
+ val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(aggregates)
new RowTypeInfo(aggTypes: _*)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
new file mode 100644
index 0000000..7616ede
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+ * A ProcessFunction to support unbounded event-time over-window
+ *
+ * @param aggregates the aggregate functions
+ * @param aggFields the filed index which the aggregate functions use
+ * @param forwardedFieldCount the input fields count
+ * @param intermediateType the intermediate row tye which the state saved
+ * @param inputType the input row tye which the state saved
+ *
+ */
+class UnboundedEventTimeOverProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int,
+ private val intermediateType: TypeInformation[Row],
+ private val inputType: TypeInformation[Row])
+ extends ProcessFunction[Row, Row]{
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+ private var output: Row = _
+ // state to hold the accumulators of the aggregations
+ private var accumulatorState: ValueState[Row] = _
+ // state to hold rows until the next watermark arrives
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ // list to sort timestamps to access rows in timestamp order
+ private var sortedTimestamps: util.LinkedList[Long] = _
+
+
+ override def open(config: Configuration) {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ sortedTimestamps = new util.LinkedList[Long]()
+
+ // initialize accumulator state
+ val accDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("accumulatorstate", intermediateType)
+ accumulatorState = getRuntimeContext.getState[Row](accDescriptor)
+
+ // initialize row state
+ val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType)
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+ rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+ }
+
+ /**
+ * Puts an element from the input stream into state if it is not late.
+ * Registers a timer for the next watermark.
+ *
+ * @param input The input value.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ *
+ */
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ val timestamp = ctx.timestamp()
+ val curWatermark = ctx.timerService().currentWatermark()
+
+ // discard late record
+ if (timestamp >= curWatermark) {
+ // ensure every key just registers one timer
+ ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+
+ // put row into state
+ var rowList = rowMapState.get(timestamp)
+ if (rowList == null) {
+ rowList = new util.ArrayList[Row]()
+ }
+ rowList.add(input)
+ rowMapState.put(timestamp, rowList)
+ }
+ }
+
+ /**
+ * Called when a watermark arrived.
+ * Sorts records according the timestamp, computes aggregates, and emits all records with
+ * timestamp smaller than the watermark in timestamp order.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ */
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[Row, Row]#OnTimerContext,
+ out: Collector[Row]): Unit = {
+
+ Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]])
+ val collector = out.asInstanceOf[TimestampedCollector[Row]]
+
+ val keyIterator = rowMapState.keys.iterator
+ if (keyIterator.hasNext) {
+ val curWatermark = ctx.timerService.currentWatermark
+ var existEarlyRecord: Boolean = false
+ var i = 0
+
+ // sort the record timestamps
+ do {
+ val recordTime = keyIterator.next
+ // only take timestamps smaller/equal to the watermark
+ if (recordTime <= curWatermark) {
+ insertToSortedList(recordTime)
+ } else {
+ existEarlyRecord = true
+ }
+ } while (keyIterator.hasNext)
+
+ // get last accumulator
+ var lastAccumulator = accumulatorState.value
+ if (lastAccumulator == null) {
+ // initialize accumulator
+ lastAccumulator = new Row(aggregates.length)
+ while (i < aggregates.length) {
+ lastAccumulator.setField(i, aggregates(i).createAccumulator())
+ i += 1
+ }
+ }
+
+ // emit the rows in order
+ while (!sortedTimestamps.isEmpty) {
+ val curTimestamp = sortedTimestamps.removeFirst()
+ val curRowList = rowMapState.get(curTimestamp)
+ collector.setAbsoluteTimestamp(curTimestamp)
+
+ var j = 0
+ while (j < curRowList.size) {
+ val curRow = curRowList.get(j)
+ i = 0
+
+ // copy forwarded fields to output row
+ while (i < forwardedFieldCount) {
+ output.setField(i, curRow.getField(i))
+ i += 1
+ }
+
+ // update accumulators and copy aggregates to output row
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+ // emit output row
+ collector.collect(output)
+ j += 1
+ }
+ rowMapState.remove(curTimestamp)
+ }
+
+ accumulatorState.update(lastAccumulator)
+
+ // if are are rows with timestamp > watermark, register a timer for the next watermark
+ if (existEarlyRecord) {
+ ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+ }
+ }
+ }
+
+ /**
+ * Inserts timestamps in order into a linked list.
+ *
+ * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
+ * an append with O(1).
+ */
+ private def insertToSortedList(recordTimeStamp: Long) = {
+ val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
+ var continue = true
+ while (listIterator.hasPrevious && continue) {
+ val timestamp = listIterator.previous
+ if (recordTimeStamp >= timestamp) {
+ listIterator.next
+ listIterator.add(recordTimeStamp)
+ continue = false
+ }
+ }
+
+ if (continue) {
+ sortedTimestamps.addFirst(recordTimeStamp)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 19350a7..34a68b2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,8 +19,8 @@
package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{TableEnvironment, TableException}
@@ -436,6 +436,266 @@ class SqlITCase extends StreamingWithStateTestBase {
env.execute()
}
+ /** test sliding event-time unbounded window with partition by **/
+ @Test
+ def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ // the next 3 elements are late
+ Left(14000008L, (1, 4L, "Hello world")),
+ Left(14000008L, (2, 3L, "Hello world")),
+ Left(14000008L, (3, 3L, "Hello world")),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 6L, "Hello world")),
+ // the next 3 elements are late
+ Left(14000019L, (1, 6L, "Hello world")),
+ Left(14000018L, (2, 4L, "Hello world")),
+ Left(14000018L, (3, 4L, "Hello world")),
+ Left(14000022L, (2, 5L, "Hello world")),
+ Left(14000022L, (3, 5L, "Hello world")),
+ Left(14000024L, (1, 7L, "Hello world")),
+ Left(14000023L, (1, 8L, "Hello world")),
+ Left(14000021L, (1, 9L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,Hello,2,1,2,2,2",
+ "1,3,Hello world,5,2,2,3,2",
+ "1,1,Hi,6,3,2,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,3,2,1,2,1",
+ "3,1,Hello,1,1,1,1,1",
+ "3,2,Hello world,3,2,1,2,1",
+ "1,5,Hello world,11,4,2,5,1",
+ "1,6,Hello world,17,5,3,6,1",
+ "1,9,Hello world,26,6,4,9,1",
+ "1,8,Hello world,34,7,4,9,1",
+ "1,7,Hello world,41,8,5,9,1",
+ "2,5,Hello world,8,3,2,5,1",
+ "3,5,Hello world,8,3,2,5,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test sliding event-time unbounded window with partition by **/
+ @Test
+ def testUnboundedEventTimeRowWindowWithPartitionMultiThread(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 6L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Left(14000024L, (3, 5L, "Hello world")),
+ Left(14000026L, (1, 7L, "Hello world")),
+ Left(14000025L, (1, 8L, "Hello world")),
+ Left(14000022L, (1, 9L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,Hello,2,1,2,2,2",
+ "1,3,Hello world,5,2,2,3,2",
+ "1,1,Hi,6,3,2,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,3,2,1,2,1",
+ "3,1,Hello,1,1,1,1,1",
+ "3,2,Hello world,3,2,1,2,1",
+ "1,5,Hello world,11,4,2,5,1",
+ "1,6,Hello world,17,5,3,6,1",
+ "1,9,Hello world,26,6,4,9,1",
+ "1,8,Hello world,34,7,4,9,1",
+ "1,7,Hello world,41,8,5,9,1",
+ "2,5,Hello world,8,3,2,5,1",
+ "3,5,Hello world,8,3,2,5,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test sliding event-time unbounded window without partitiion by **/
+ @Test
+ def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 2L, "Hello")),
+ Left(14000002L, (3, 5L, "Hello")),
+ Left(14000003L, (1, 3L, "Hello")),
+ Left(14000004L, (3, 7L, "Hello world")),
+ Left(14000007L, (4, 9L, "Hello world")),
+ Left(14000008L, (5, 8L, "Hello world")),
+ Right(14000010L),
+ // this element will be discard because it is late
+ Left(14000008L, (6, 8L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (6, 8L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello,2,1,2,2,2",
+ "3,5,Hello,7,2,3,5,2",
+ "1,3,Hello,10,3,3,5,2",
+ "3,7,Hello world,17,4,4,7,2",
+ "1,1,Hi,18,5,3,7,1",
+ "4,9,Hello world,27,6,4,9,1",
+ "5,8,Hello world,35,7,5,9,1",
+ "6,8,Hello world,43,8,5,9,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test sliding event-time unbounded window without partitiion by and arrive early **/
+ @Test
+ def testUnboundedEventTimeRowWindowArriveEarly(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 2L, "Hello")),
+ Left(14000002L, (3, 5L, "Hello")),
+ Left(14000003L, (1, 3L, "Hello")),
+ // next three elements are early
+ Left(14000012L, (3, 7L, "Hello world")),
+ Left(14000013L, (4, 9L, "Hello world")),
+ Left(14000014L, (5, 8L, "Hello world")),
+ Right(14000010L),
+ Left(14000011L, (6, 8L, "Hello world")),
+ // next element is early
+ Left(14000021L, (6, 8L, "Hello world")),
+ Right(14000020L),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello,2,1,2,2,2",
+ "3,5,Hello,7,2,3,5,2",
+ "1,3,Hello,10,3,3,5,2",
+ "1,1,Hi,11,4,2,5,1",
+ "6,8,Hello world,19,5,3,8,1",
+ "3,7,Hello world,26,6,4,8,1",
+ "4,9,Hello world,35,7,5,9,1",
+ "5,8,Hello world,43,8,5,9,1",
+ "6,8,Hello world,51,9,5,9,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
object SqlITCase {
@@ -451,5 +711,4 @@ object SqlITCase {
override def cancel(): Unit = ???
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 9a425b3..7b8b2df 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -241,12 +241,67 @@ class WindowAggregateTest extends TableTestBase {
}
@Test
+ def testUnboundNonPartitionedEventTimeWindowWithRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "ROWTIME() AS $2")
+ ),
+ term("orderBy", "ROWTIME"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testUnboundPartitionedEventTimeWindowWithRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "ROWTIME() AS $2")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "ROWTIME"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
def testBoundPartitionedRowTimeWindowWithRow() = {
val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
- "CURRENT ROW) as cnt1 " +
- "from MyTable"
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
val expected =
unaryNode(
@@ -294,4 +349,5 @@ class WindowAggregateTest extends TableTestBase {
)
streamUtil.verifySql(sql, expected)
}
+
}
[4/5] flink git commit: [FLINK-5990] [table] Add event-time OVER ROWS
BETWEEN x PRECEDING aggregation to SQL.
Posted by fh...@apache.org.
[FLINK-5990] [table] Add event-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.
This closes #3585.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a9d39fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a9d39fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a9d39fe
Branch: refs/heads/master
Commit: 7a9d39fe9f659d43bf4719a2981f6c4771ffbe48
Parents: 6949c8c
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Sun Mar 19 23:31:00 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100
----------------------------------------------------------------------
.../flink/table/plan/nodes/OverAggregate.scala | 31 ++-
.../datastream/DataStreamOverAggregate.scala | 149 +++++++++---
.../table/runtime/aggregate/AggregateUtil.scala | 48 +++-
.../RowsClauseBoundedOverProcessFunction.scala | 239 +++++++++++++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 139 ++++++++++-
.../scala/stream/sql/WindowAggregateTest.scala | 55 +++++
6 files changed, 623 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
index 793ab23..91c8cef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
@@ -18,12 +18,15 @@
package org.apache.flink.table.plan.nodes
-import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.{RelFieldCollation, RelNode}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.Window
+import org.apache.calcite.rex.{RexInputRef}
import org.apache.flink.table.runtime.aggregate.AggregateUtil._
import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
+
import scala.collection.JavaConverters._
trait OverAggregate {
@@ -46,8 +49,16 @@ trait OverAggregate {
orderingString
}
- private[flink] def windowRange(overWindow: Group): String = {
- s"BETWEEN ${overWindow.lowerBound} AND ${overWindow.upperBound}"
+ private[flink] def windowRange(
+ logicWindow: Window,
+ overWindow: Group,
+ input: RelNode): String = {
+ if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded) {
+ s"BETWEEN ${getLowerBoundary(logicWindow, overWindow, input)} PRECEDING " +
+ s"AND ${overWindow.upperBound}"
+ } else {
+ s"BETWEEN ${overWindow.lowerBound} AND ${overWindow.upperBound}"
+ }
}
private[flink] def aggregationToString(
@@ -92,4 +103,18 @@ trait OverAggregate {
}.mkString(", ")
}
+ private[flink] def getLowerBoundary(
+ logicWindow: Window,
+ overWindow: Group,
+ input: RelNode): Long = {
+
+ val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
+ val lowerBoundIndex = input.getRowType.getFieldCount - ref.getIndex;
+ val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
+ lowerBound match {
+ case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
+ case _ => lowerBound.asInstanceOf[Long]
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 34b3b0f..547c875 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -32,6 +32,7 @@ import org.apache.calcite.rel.core.Window
import org.apache.calcite.rel.core.Window.Group
import java.util.{List => JList}
+import org.apache.flink.api.java.functions.NullByteKeySelector
import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
@@ -70,9 +71,9 @@ class DataStreamOverAggregate(
super.explainTerms(pw)
.itemIf("partitionBy", partitionToString(inputType, partitionKeys), partitionKeys.nonEmpty)
- .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
- .itemIf("rows", windowRange(overWindow), overWindow.isRows)
- .itemIf("range", windowRange(overWindow), !overWindow.isRows)
+ .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
+ .itemIf("rows", windowRange(logicWindow, overWindow, getInput), overWindow.isRows)
+ .itemIf("range", windowRange(logicWindow, overWindow, getInput), !overWindow.isRows)
.item(
"select", aggregationToString(
inputType,
@@ -99,20 +100,58 @@ class DataStreamOverAggregate(
.getFieldList
.get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
.getValue
-
timeType match {
case _: ProcTimeType =>
- // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
- if (overWindow.lowerBound.isUnbounded &&
- overWindow.upperBound.isCurrentRow) {
+ // proc-time OVER window
+ if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
+ // non-bounded OVER window
createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+ } else if (
+ overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
+ overWindow.upperBound.isCurrentRow) {
+ // bounded OVER window
+ if (overWindow.isRows) {
+ // ROWS clause bounded OVER window
+ throw new TableException(
+ "ROWS clause bounded proc-time OVER window no supported yet.")
+ } else {
+ // RANGE clause bounded OVER window
+ throw new TableException(
+ "RANGE clause bounded proc-time OVER window no supported yet.")
+ }
} else {
throw new TableException(
- "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
- "condition.")
+ "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
+ "condition.")
}
case _: RowTimeType =>
- throw new TableException("OVER Window of the EventTime type is not currently supported.")
+ // row-time OVER window
+ if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
+ // non-bounded OVER window
+ if (overWindow.isRows) {
+ // ROWS clause unbounded OVER window
+ throw new TableException(
+ "ROWS clause unbounded row-time OVER window no supported yet.")
+ } else {
+ // RANGE clause unbounded OVER window
+ throw new TableException(
+ "RANGE clause unbounded row-time OVER window no supported yet.")
+ }
+ } else if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
+ overWindow.upperBound.isCurrentRow) {
+ // bounded OVER window
+ if (overWindow.isRows) {
+ // ROWS clause bounded OVER window
+ createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, true)
+ } else {
+ // RANGE clause bounded OVER window
+ throw new TableException(
+ "RANGE clause bounded row-time OVER window no supported yet.")
+ }
+ } else {
+ throw new TableException(
+ "row-time OVER window only support CURRENT ROW condition.")
+ }
case _ =>
throw new TableException(s"Unsupported time type {$timeType}")
}
@@ -120,7 +159,7 @@ class DataStreamOverAggregate(
}
def createUnboundedAndCurrentRowProcessingTimeOverWindow(
- inputDS: DataStream[Row]): DataStream[Row] = {
+ inputDS: DataStream[Row]): DataStream[Row] = {
val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -130,32 +169,78 @@ class DataStreamOverAggregate(
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
val result: DataStream[Row] =
- // partitioned aggregation
- if (partitionKeys.nonEmpty) {
- val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
- namedAggregates,
- inputType)
+ // partitioned aggregation
+ if (partitionKeys.nonEmpty) {
+ val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
+ namedAggregates,
+ inputType)
- inputDS
+ inputDS
.keyBy(partitionKeys: _*)
.process(processFunction)
.returns(rowTypeInfo)
.name(aggOpName)
.asInstanceOf[DataStream[Row]]
- }
- // non-partitioned aggregation
- else {
- val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
- namedAggregates,
- inputType,
- false)
-
- inputDS
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(rowTypeInfo)
- .name(aggOpName)
- .asInstanceOf[DataStream[Row]]
- }
+ }
+ // non-partitioned aggregation
+ else {
+ val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
+ namedAggregates,
+ inputType,
+ false)
+
+ inputDS
+ .process(processFunction).setParallelism(1).setMaxParallelism(1)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ result
+ }
+
+ def createRowsClauseBoundedAndCurrentRowOverWindow(
+ inputDS: DataStream[Row],
+ isRowTimeType: Boolean = false): DataStream[Row] = {
+
+ val overWindow: Group = logicWindow.groups.get(0)
+ val partitionKeys: Array[Int] = overWindow.keys.toArray
+ val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+ val inputFields = (0 until inputType.getFieldCount).toArray
+
+ val precedingOffset =
+ getLowerBoundary(logicWindow, overWindow, getInput()) + 1
+
+ // get the output types
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+ val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction(
+ namedAggregates,
+ inputType,
+ inputFields,
+ precedingOffset,
+ isRowTimeType
+ )
+ val result: DataStream[Row] =
+ // partitioned aggregation
+ if (partitionKeys.nonEmpty) {
+ inputDS
+ .keyBy(partitionKeys: _*)
+ .process(processFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ // non-partitioned aggregation
+ else {
+ inputDS
+ .keyBy(new NullByteKeySelector[Row])
+ .process(processFunction)
+ .setParallelism(1)
+ .setMaxParallelism(1)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
result
}
@@ -180,7 +265,7 @@ class DataStreamOverAggregate(
}
}ORDER BY: ${orderingToString(inputType, overWindow.orderKeys.getFieldCollations)}, " +
s"${if (overWindow.isRows) "ROWS" else "RANGE"}" +
- s"${windowRange(overWindow)}, " +
+ s"${windowRange(logicWindow, overWindow, getInput)}, " +
s"select: (${
aggregationToString(
inputType,
http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 9feec17..0084ee5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -61,7 +61,7 @@ object AggregateUtil {
* @param isPartitioned Flag to indicate whether the input is partitioned or not
* @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
*/
- private[flink] def CreateUnboundedProcessingOverProcessFunction(
+ private[flink] def createUnboundedProcessingOverProcessFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType,
isPartitioned: Boolean = true): ProcessFunction[Row, Row] = {
@@ -91,6 +91,52 @@ object AggregateUtil {
}
/**
+ * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
+ * bounded OVER window to evaluate final aggregate value.
+ *
+ * @param namedAggregates List of calls to aggregate functions and their output field names
+ * @param inputType Input row type
+ * @param inputFields All input fields
+ * @param precedingOffset the preceding offset
+ * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
+ * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
+ */
+ private[flink] def createRowsClauseBoundedOverProcessFunction(
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ inputFields: Array[Int],
+ precedingOffset: Long,
+ isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
+
+ val (aggFields, aggregates) =
+ transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ needRetraction = true)
+
+ val aggregationStateType: RowTypeInfo =
+ createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+
+ val inputRowType: RowTypeInfo =
+ createDataSetAggregateBufferDataType(inputFields, Array(), inputType)
+
+ val processFunction = if (isRowTimeType) {
+ new RowsClauseBoundedOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ inputRowType,
+ precedingOffset
+ )
+ } else {
+ throw TableException(
+ "Bounded partitioned proc-time OVER aggregation is not supported yet.")
+ }
+ processFunction
+ }
+
+ /**
* Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.
* The output of the function contains the grouping keys and the timestamp and the intermediate
* aggregate values of all aggregate function. The timestamp field is aligned to time window
http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
new file mode 100644
index 0000000..1678d57
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for ROWS clause event-time bounded OVER window
+ *
+ * @param aggregates the list of all [[AggregateFunction]] used for this aggregation
+ * @param aggFields the position (in the input Row) of the input value for each aggregate
+ * @param forwardedFieldCount the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset the preceding offset
+ */
+class RowsClauseBoundedOverProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int,
+ private val aggregationStateType: RowTypeInfo,
+ private val inputRowType: RowTypeInfo,
+ private val precedingOffset: Long)
+ extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkNotNull(forwardedFieldCount)
+ Preconditions.checkNotNull(aggregationStateType)
+ Preconditions.checkNotNull(precedingOffset)
+
+ private var output: Row = _
+
+ // the state which keeps the last triggering timestamp
+ private var lastTriggeringTsState: ValueState[Long] = _
+
+ // the state which keeps the count of data
+ private var dataCountState: ValueState[Long] = _
+
+ // the state which used to materialize the accumulator for incremental calculation
+ private var accumulatorState: ValueState[Row] = _
+
+ // the state which keeps all the data that are not expired.
+ // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+ // the second element of tuple is a list that contains the entire data of all the rows belonging
+ // to this time stamp.
+ private var dataState: MapState[Long, JList[Row]] = _
+
+ override def open(config: Configuration) {
+
+ output = new Row(forwardedFieldCount + aggregates.length)
+
+ val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
+ lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+ val dataCountStateDescriptor =
+ new ValueStateDescriptor[Long]("dataCountState", classOf[Long])
+ dataCountState = getRuntimeContext.getState(dataCountStateDescriptor)
+
+ val accumulatorStateDescriptor =
+ new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
+ accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
+
+ val keyTypeInformation: TypeInformation[Long] =
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+ val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]](
+ "dataState",
+ keyTypeInformation,
+ valueTypeInformation)
+
+ dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ // triggering timestamp for trigger calculation
+ val triggeringTs = ctx.timestamp
+
+ val lastTriggeringTs = lastTriggeringTsState.value
+ // check if the data is expired, if not, save the data and register event time timer
+
+ if (triggeringTs > lastTriggeringTs) {
+ val data = dataState.get(triggeringTs)
+ if (null != data) {
+ data.add(input)
+ dataState.put(triggeringTs, data)
+ } else {
+ val data = new util.ArrayList[Row]
+ data.add(input)
+ dataState.put(triggeringTs, data)
+ // register event time timer
+ ctx.timerService.registerEventTimeTimer(triggeringTs)
+ }
+ }
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[Row, Row]#OnTimerContext,
+ out: Collector[Row]): Unit = {
+
+ // gets all window data from state for the calculation
+ val inputs: JList[Row] = dataState.get(timestamp)
+
+ if (null != inputs) {
+
+ var accumulators = accumulatorState.value
+ var dataCount = dataCountState.value
+
+ var retractList: JList[Row] = null
+ var retractTs: Long = Long.MaxValue
+ var retractCnt: Int = 0
+ var j = 0
+ var i = 0
+
+ while (j < inputs.size) {
+ val input = inputs.get(j)
+
+ // initialize when first run or failover recovery per key
+ if (null == accumulators) {
+ accumulators = new Row(aggregates.length)
+ i = 0
+ while (i < aggregates.length) {
+ accumulators.setField(i, aggregates(i).createAccumulator())
+ i += 1
+ }
+ }
+
+ var retractRow: Row = null
+
+ if (dataCount >= precedingOffset) {
+ if (null == retractList) {
+ // find the smallest timestamp
+ retractTs = Long.MaxValue
+ val dataTimestampIt = dataState.keys.iterator
+ while (dataTimestampIt.hasNext) {
+ val dataTs = dataTimestampIt.next
+ if (dataTs < retractTs) {
+ retractTs = dataTs
+ }
+ }
+ // get the oldest rows to retract them
+ retractList = dataState.get(retractTs)
+ }
+
+ retractRow = retractList.get(retractCnt)
+ retractCnt += 1
+
+ // remove retracted values from state
+ if (retractList.size == retractCnt) {
+ dataState.remove(retractTs)
+ retractList = null
+ retractCnt = 0
+ }
+ } else {
+ dataCount += 1
+ }
+
+ // copy forwarded fields to output row
+ i = 0
+ while (i < forwardedFieldCount) {
+ output.setField(i, input.getField(i))
+ i += 1
+ }
+
+ // retract old row from accumulators
+ if (null != retractRow) {
+ i = 0
+ while (i < aggregates.length) {
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).retract(accumulator, retractRow.getField(aggFields(i)))
+ i += 1
+ }
+ }
+
+ // accumulate current row and set aggregate in output row
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+ j += 1
+
+ out.collect(output)
+ }
+
+ // update all states
+ if (dataState.contains(retractTs)) {
+ if (retractCnt > 0) {
+ retractList.subList(0, retractCnt).clear()
+ dataState.put(retractTs, retractList)
+ }
+ }
+ dataCountState.update(dataCount)
+ accumulatorState.update(accumulators)
+ }
+
+ lastTriggeringTsState.update(timestamp)
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index d5a140a..19350a7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,14 +19,18 @@
package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamingWithStateTestBase, StreamITCase,
-StreamTestData}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import scala.collection.mutable
@@ -293,6 +297,120 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
+ val data = Seq(
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((3L, (7L, 7, "Hello World"))),
+ Left((1L, (7L, 7, "Hello World"))),
+ Left((1L, (7L, 7, "Hello World"))),
+ Right(2L),
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))),
+ Right(6L),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(20L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, a, " +
+ "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+ ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+ " from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+ "Hello,2,3,4", "Hello,2,3,5","Hello,2,3,6",
+ "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
+ "Hello,6,3,15",
+ "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
+ "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testBoundNonPartitionedEventTimeWindowWithRow(): Unit = {
+
+ val data = Seq(
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))), // early row
+ Right(3L),
+ Left((2L, (2L, 2, "Hello"))), // late row
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(7L),
+ Left((9L, (9L, 9, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Right(20L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, a, " +
+ "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
+ "sum(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+ "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+ "Hello,3,3,7",
+ "Hello,4,3,9", "Hello,5,3,12",
+ "Hello,6,3,15", "Hello World,7,3,18",
+ "Hello World,8,3,21", "Hello World,8,3,23",
+ "Hello World,9,3,25",
+ "Hello World,20,3,37")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
/**
* All aggregates must be computed on the same window.
*/
@@ -317,4 +435,21 @@ class SqlITCase extends StreamingWithStateTestBase {
result.addSink(new StreamITCase.StringSink)
env.execute()
}
+
+}
+
+object SqlITCase {
+
+ class EventTimeSourceFunction[T](
+ dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+ override def run(ctx: SourceContext[T]): Unit = {
+ dataWithTimestampList.foreach {
+ case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+ case Right(w) => ctx.emitWatermark(new Watermark(w))
+ }
+ }
+
+ override def cancel(): Unit = ???
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a9d39fe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index a25e59c..9a425b3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -239,4 +239,59 @@ class WindowAggregateTest extends TableTestBase {
)
streamUtil.verifySql(sql, expected)
}
+
+ @Test
+ def testBoundPartitionedRowTimeWindowWithRow() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "ROWTIME() AS $2")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "ROWTIME"),
+ term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testBoundNonPartitionedRowTimeWindowWithRow() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "ROWTIME() AS $2")
+ ),
+ term("orderBy", "ROWTIME"),
+ term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
}
[3/5] flink git commit: [FLINK-5570] [table] Register
ExternalCatalogs in TableEnvironment.
Posted by fh...@apache.org.
[FLINK-5570] [table] Register ExternalCatalogs in TableEnvironment.
This closes #3409.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/135a57c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/135a57c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/135a57c4
Branch: refs/heads/master
Commit: 135a57c4bb37eaa9cb85faaff1cc694f9448fabd
Parents: 976e03c
Author: jingzhang <be...@126.com>
Authored: Thu Mar 16 11:24:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100
----------------------------------------------------------------------
docs/dev/table_api.md | 37 +++++
.../flink/table/api/TableEnvironment.scala | 104 ++++++++++--
.../org/apache/flink/table/api/exceptions.scala | 62 +++++--
.../table/catalog/ExternalCatalogSchema.scala | 14 +-
.../flink/table/plan/logical/operators.scala | 4 +-
.../flink/table/ExternalCatalogTest.scala | 161 +++++++++++++++++++
.../catalog/ExternalCatalogSchemaTest.scala | 5 +-
7 files changed, 342 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 03b916c..117f32f 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -344,6 +344,43 @@ tableEnvironment.unregisterTable("Customers")
</div>
</div>
+Registering external Catalogs
+--------------------------------
+
+An external catalog is defined by the `ExternalCatalog` interface and provides information about databases and tables such as their name, schema, statistics, and access information. An `ExternalCatalog` is registered in a `TableEnvironment` as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// works for StreamExecutionEnvironment identically
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ExternalCatalog customerCatalog = new InMemoryExternalCatalog();
+
+// register the ExternalCatalog customerCatalog
+tableEnv.registerExternalCatalog("Customers", customerCatalog);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// works for StreamExecutionEnvironment identically
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+val customerCatalog: ExternalCatalog = new InMemoryExternalCatalog
+
+// register the ExternalCatalog customerCatalog
+tableEnv.registerExternalCatalog("Customers", customerCatalog)
+
+{% endhighlight %}
+</div>
+</div>
+
+Once registered in a `TableEnvironment`, all tables defined in a `ExternalCatalog` can be accessed from Table API or SQL queries by specifying their full path (`catalog`.`database`.`table`).
+
+Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing purposes. However, the `ExternalCatalog` interface can also be used to connect catalogs like HCatalog or Metastore to the Table API.
Table API
----------
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 1dda3a8..bb4c3ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala
import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
@@ -60,6 +61,8 @@ import org.apache.flink.table.validate.FunctionCatalog
import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable.HashMap
+import _root_.scala.annotation.varargs
/**
* The abstract base class for batch and stream TableEnvironments.
@@ -71,7 +74,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// the catalog to hold all registered and translated tables
// we disable caching here to prevent side effects
private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(true, false)
- private val tables: SchemaPlus = internalSchema.plus()
+ private val rootSchema: SchemaPlus = internalSchema.plus()
// Table API/SQL function catalog
private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
@@ -79,7 +82,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// the configuration to create a Calcite planner
private lazy val frameworkConfig: FrameworkConfig = Frameworks
.newConfigBuilder
- .defaultSchema(tables)
+ .defaultSchema(rootSchema)
.parserConfig(getSqlParserConfig)
.costFactory(new DataSetCostFactory)
.typeSystem(new FlinkTypeSystem)
@@ -99,6 +102,9 @@ abstract class TableEnvironment(val config: TableConfig) {
// a counter for unique attribute names
private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
+ // registered external catalog names -> catalog
+ private val externalCatalogs = new HashMap[String, ExternalCatalog]
+
/** Returns the table config to define the runtime behavior of the Table API. */
def getConfig = config
@@ -246,6 +252,35 @@ abstract class TableEnvironment(val config: TableConfig) {
}
/**
+ * Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema.
+ * All tables registered in the [[ExternalCatalog]] can be accessed.
+ *
+ * @param name The name under which the externalCatalog will be registered
+ * @param externalCatalog The externalCatalog to register
+ */
+ def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
+ if (rootSchema.getSubSchema(name) != null) {
+ throw new ExternalCatalogAlreadyExistException(name)
+ }
+ this.externalCatalogs.put(name, externalCatalog)
+ // create an external catalog calicte schema, register it on the root schema
+ ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog)
+ }
+
+ /**
+ * Gets a registered [[ExternalCatalog]] by name.
+ *
+ * @param name The name to look up the [[ExternalCatalog]]
+ * @return The [[ExternalCatalog]]
+ */
+ def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
+ this.externalCatalogs.get(name) match {
+ case Some(catalog) => catalog
+ case None => throw new ExternalCatalogNotExistException(name)
+ }
+ }
+
+ /**
* Registers a [[ScalarFunction]] under a unique name. Replaces already existing
* user-defined functions under this name.
*/
@@ -254,6 +289,7 @@ abstract class TableEnvironment(val config: TableConfig) {
checkForInstantiation(function.getClass)
// register in Table API
+
functionCatalog.registerFunction(name, function.getClass)
// register in SQL API
@@ -341,7 +377,7 @@ abstract class TableEnvironment(val config: TableConfig) {
protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
if (isRegistered(name)) {
- tables.add(name, table)
+ rootSchema.add(name, table)
} else {
throw new TableException(s"Table \'$name\' is not registered.")
}
@@ -350,19 +386,55 @@ abstract class TableEnvironment(val config: TableConfig) {
/**
* Scans a registered table and returns the resulting [[Table]].
*
- * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
+ * A table to scan must be registered in the TableEnvironment. It can be either directly
+ * registered as DataStream, DataSet, or Table or as member of an [[ExternalCatalog]].
+ *
+ * Examples:
*
- * @param tableName The name of the table to scan.
- * @throws ValidationException if no table is registered under the given name.
- * @return The scanned table.
+ * - Scanning a directly registered table
+ * {{{
+ * val tab: Table = tableEnv.scan("tableName")
+ * }}}
+ *
+ * - Scanning a table from a registered catalog
+ * {{{
+ * val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
+ * }}}
+ *
+ * @param tablePath The path of the table to scan.
+ * @throws TableException if no table is found using the given table path.
+ * @return The resulting [[Table]].
*/
- @throws[ValidationException]
- def scan(tableName: String): Table = {
- if (isRegistered(tableName)) {
- new Table(this, CatalogNode(tableName, getRowType(tableName)))
- } else {
- throw new TableException(s"Table \'$tableName\' was not found in the registry.")
+ @throws[TableException]
+ @varargs
+ def scan(tablePath: String*): Table = {
+ scanInternal(tablePath.toArray)
+ }
+
+ @throws[TableException]
+ private def scanInternal(tablePath: Array[String]): Table = {
+ require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
+ val schemaPaths = tablePath.slice(0, tablePath.length - 1)
+ val schema = getSchema(schemaPaths)
+ if (schema != null) {
+ val tableName = tablePath(tablePath.length - 1)
+ val table = schema.getTable(tableName)
+ if (table != null) {
+ return new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory)))
+ }
+ }
+ throw new TableException(s"Table \'${tablePath.mkString(".")}\' was not found.")
+ }
+
+ private def getSchema(schemaPath: Array[String]): SchemaPlus = {
+ var schema = rootSchema
+ for (schemaName <- schemaPath) {
+ schema = schema.getSubSchema(schemaName)
+ if (schema == null) {
+ return schema
+ }
}
+ schema
}
/**
@@ -416,7 +488,7 @@ abstract class TableEnvironment(val config: TableConfig) {
throw new TableException(s"Table \'$name\' already exists. " +
s"Please, choose a different name.")
} else {
- tables.add(name, table)
+ rootSchema.add(name, table)
}
}
@@ -434,11 +506,11 @@ abstract class TableEnvironment(val config: TableConfig) {
* @return true, if a table is registered under the name, false otherwise.
*/
protected def isRegistered(name: String): Boolean = {
- tables.getTableNames.contains(name)
+ rootSchema.getTableNames.contains(name)
}
protected def getRowType(name: String): RelDataType = {
- tables.getTable(name).getRowType(typeFactory)
+ rootSchema.getTable(name).getRowType(typeFactory)
}
/** Returns a unique temporary attribute name. */
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 8632436..760cf75 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -75,34 +75,34 @@ object ValidationException {
case class UnresolvedException(msg: String) extends RuntimeException(msg)
/**
- * Exception for operation on a nonexistent table
+ * Exception for an operation on a nonexistent table
*
* @param db database name
* @param table table name
- * @param cause
+ * @param cause the cause
*/
case class TableNotExistException(
db: String,
table: String,
cause: Throwable)
- extends RuntimeException(s"table $db.$table does not exist!", cause) {
+ extends RuntimeException(s"Table $db.$table does not exist.", cause) {
def this(db: String, table: String) = this(db, table, null)
}
/**
- * Exception for adding an already existed table
+ * Exception for adding an already existent table
*
* @param db database name
* @param table table name
- * @param cause
+ * @param cause the cause
*/
case class TableAlreadyExistException(
db: String,
table: String,
cause: Throwable)
- extends RuntimeException(s"table $db.$table already exists!", cause) {
+ extends RuntimeException(s"Table $db.$table already exists.", cause) {
def this(db: String, table: String) = this(db, table, null)
@@ -112,56 +112,84 @@ case class TableAlreadyExistException(
* Exception for operation on a nonexistent database
*
* @param db database name
- * @param cause
+ * @param cause the cause
*/
case class DatabaseNotExistException(
db: String,
cause: Throwable)
- extends RuntimeException(s"database $db does not exist!", cause) {
+ extends RuntimeException(s"Database $db does not exist.", cause) {
def this(db: String) = this(db, null)
}
/**
- * Exception for adding an already existed database
+ * Exception for adding an already existent database
*
* @param db database name
- * @param cause
+ * @param cause the cause
*/
case class DatabaseAlreadyExistException(
db: String,
cause: Throwable)
- extends RuntimeException(s"database $db already exists!", cause) {
+ extends RuntimeException(s"Database $db already exists.", cause) {
def this(db: String) = this(db, null)
}
/**
- * Exception for does not find any matched [[TableSourceConverter]] for a specified table type
+ * Exception for not finding a [[TableSourceConverter]] for a given table type.
*
* @param tableType table type
- * @param cause
+ * @param cause the cause
*/
case class NoMatchedTableSourceConverterException(
tableType: String,
cause: Throwable)
- extends RuntimeException(s"find no table source converter matched table type $tableType!",
+ extends RuntimeException(s"Could not find a TableSourceConverter for table type $tableType.",
cause) {
def this(tableType: String) = this(tableType, null)
}
/**
- * Exception for find more than one matched [[TableSourceConverter]] for a specified table type
+ * Exception for finding more than one [[TableSourceConverter]] for a given table type.
*
* @param tableType table type
- * @param cause
+ * @param cause the cause
*/
case class AmbiguousTableSourceConverterException(
tableType: String,
cause: Throwable)
- extends RuntimeException(s"more than one table source converter matched table type $tableType!",
+ extends RuntimeException(s"More than one TableSourceConverter for table type $tableType.",
cause) {
def this(tableType: String) = this(tableType, null)
}
+
+/**
+ * Exception for operation on a nonexistent external catalog
+ *
+ * @param catalogName external catalog name
+ * @param cause the cause
+ */
+case class ExternalCatalogNotExistException(
+ catalogName: String,
+ cause: Throwable)
+ extends RuntimeException(s"External catalog $catalogName does not exist.", cause) {
+
+ def this(catalogName: String) = this(catalogName, null)
+}
+
+/**
+ * Exception for adding an already existent external catalog
+ *
+ * @param catalogName external catalog name
+ * @param cause the cause
+ */
+case class ExternalCatalogAlreadyExistException(
+ catalogName: String,
+ cause: Throwable)
+ extends RuntimeException(s"External catalog $catalogName already exists.", cause) {
+
+ def this(catalogName: String) = this(catalogName, null)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index e3ed96e..8e010fa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -136,20 +136,18 @@ class ExternalCatalogSchema(
object ExternalCatalogSchema {
/**
- * Creates a FlinkExternalCatalogSchema.
+ * Registers an external catalog in a Calcite schema.
*
- * @param parentSchema Parent schema
- * @param externalCatalogIdentifier External catalog identifier
- * @param externalCatalog External catalog object
- * @return Created schema
+ * @param parentSchema Parent schema into which the catalog is registered
+ * @param externalCatalogIdentifier Identifier of the external catalog
+ * @param externalCatalog The external catalog to register
*/
- def create(
+ def registerCatalog(
parentSchema: SchemaPlus,
externalCatalogIdentifier: String,
- externalCatalog: ExternalCatalog): ExternalCatalogSchema = {
+ externalCatalog: ExternalCatalog): Unit = {
val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, externalCatalog)
val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
newSchema.registerSubSchemas(schemaPlusOfNewSchema)
- newSchema
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 1b5eafb..559bd75 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -511,7 +511,7 @@ case class Join(
}
case class CatalogNode(
- tableName: String,
+ tablePath: Array[String],
rowType: RelDataType) extends LeafNode {
val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field =>
@@ -519,7 +519,7 @@ case class CatalogNode(
}
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- relBuilder.scan(tableName)
+ relBuilder.scan(tablePath.toIterable.asJava)
}
override def validate(tableEnv: TableEnvironment): LogicalNode = this
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
new file mode 100644
index 0000000..696468d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+ * Test for external catalog query plan.
+ */
+class ExternalCatalogTest extends TableTestBase {
+ private val table1Path: Array[String] = Array("test", "db1", "tb1")
+ private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
+ private val table2Path: Array[String] = Array("test", "db2", "tb2")
+ private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
+
+ @Test
+ def testBatchTableApi(): Unit = {
+ val util = batchTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val table1 = tEnv.scan("test", "db1", "tb1")
+ val table2 = tEnv.scan("test", "db2", "tb2")
+ val result = table2
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+ ),
+ term("union", "_c0", "e", "_c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testBatchSQL(): Unit = {
+ val util = batchTestUtil()
+
+ util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+ "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+ term("where", "<(d, 3)")),
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS EXPR$0", "b", "c")
+ ),
+ term("union", "EXPR$0", "e", "g"))
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testStreamTableApi(): Unit = {
+ val util = streamTestUtil()
+ val tEnv = util.tEnv
+
+ util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val table1 = tEnv.scan("test", "db1", "tb1")
+ val table2 = tEnv.scan("test", "db2", "tb2")
+
+ val result = table2.where("d < 3")
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
+ term("where", "<(d, 3)")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+ ),
+ term("union", "_c0", "e", "_c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testStreamSQL(): Unit = {
+ val util = streamTestUtil()
+
+ util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+ "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+ term("where", "<(d, 3)")),
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS EXPR$0", "b", "c")
+ ),
+ term("union", "EXPR$0", "e", "g"))
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+ s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+ s"fields=[${fields.mkString(", ")}])"
+ }
+
+ def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+ s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+ s"fields=[${fields.mkString(", ")}])"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index 6ffa8c6..b780a3f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -37,7 +37,7 @@ import scala.collection.JavaConverters._
class ExternalCatalogSchemaTest {
private val schemaName: String = "test"
- private var externalCatalogSchema: ExternalCatalogSchema = _
+ private var externalCatalogSchema: SchemaPlus = _
private var calciteCatalogReader: CalciteCatalogReader = _
private val db = "db1"
private val tb = "tb1"
@@ -46,7 +46,8 @@ class ExternalCatalogSchemaTest {
def setUp(): Unit = {
val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
val catalog = CommonTestData.getInMemoryTestCatalog
- externalCatalogSchema = ExternalCatalogSchema.create(rootSchemaPlus, schemaName, catalog)
+ ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
+ externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
calciteCatalogReader = new CalciteCatalogReader(
CalciteSchema.from(rootSchemaPlus),
[5/5] flink git commit: [hotfix] [table] Improved code documentation
for external catalog.
Posted by fh...@apache.org.
[hotfix] [table] Improved code documentation for external catalog.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f97deaa9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f97deaa9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f97deaa9
Branch: refs/heads/master
Commit: f97deaa9683bf1868ecf104c73b997ede63e8856
Parents: 135a57c
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Mar 23 22:18:37 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100
----------------------------------------------------------------------
.../flink/table/annotation/TableType.java | 6 +-
.../table/catalog/CrudExternalCatalog.scala | 86 ++++++++++----------
.../flink/table/catalog/ExternalCatalog.scala | 40 ++++-----
.../table/catalog/ExternalCatalogDatabase.scala | 6 +-
.../table/catalog/ExternalCatalogTable.scala | 24 +++---
.../table/catalog/TableSourceConverter.scala | 23 +++---
6 files changed, 94 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
index 1cebe53..3845eae 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
@@ -27,7 +27,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * A {@link TableSourceConverter} with this annotation bind the converter with table type.
+ * Annotates a table type of a {@link TableSourceConverter}.
*/
@Documented
@Target(ElementType.TYPE)
@@ -36,9 +36,9 @@ import java.lang.annotation.Target;
public @interface TableType {
/**
- * Specifies the external catalog table type of {@link TableSourceConverter}.
+ * Returns the table type of a {@link TableSourceConverter}.
*
- * @return the external catalog table type of {@link TableSourceConverter}.
+ * @return The table type of the {@link TableSourceConverter}.
*/
String value();
http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index d93f140..fcefa45 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -21,88 +21,86 @@ package org.apache.flink.table.catalog
import org.apache.flink.table.api._
/**
- * This class is responsible for interact with external catalog.
- * Its main responsibilities including:
- * <ul>
- * <li> create/drop/alter database or tables for DDL operations
- * <li> provide tables for calcite catalog, it looks up databases or tables in the external catalog
- * </ul>
+ * The CrudExternalCatalog provides methods to create, drop, and alter databases or tables.
*/
trait CrudExternalCatalog extends ExternalCatalog {
/**
- * Adds table into external Catalog
+ * Adds a table to the catalog.
*
- * @param table description of table which to create
- * @param ignoreIfExists if table already exists in the catalog, not throw exception and leave
- * the existed table if ignoreIfExists is true;
- * else throw a TableAlreadyExistException.
- * @throws DatabaseNotExistException if database does not exist in the catalog yet
- * @throws TableAlreadyExistException if table already exists in the catalog and
- * ignoreIfExists is false
+ * @param table Description of the table to add
+ * @param ignoreIfExists Flag to specify behavior if a table with the given name already exists:
+ * if set to false, it throws a TableAlreadyExistException,
+ * if set to true, nothing happens.
+ * @throws DatabaseNotExistException thrown if database does not exist
+ * @throws TableAlreadyExistException thrown if table already exists and ignoreIfExists is false
*/
@throws[DatabaseNotExistException]
@throws[TableAlreadyExistException]
def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
/**
- * Deletes table from external Catalog
+ * Deletes table from a database of the catalog.
*
- * @param dbName database name
- * @param tableName table name
- * @param ignoreIfNotExists if table not exist yet, not throw exception if ignoreIfNotExists is
- * true; else throw TableNotExistException
- * @throws DatabaseNotExistException if database does not exist in the catalog yet
- * @throws TableNotExistException if table does not exist in the catalog yet
+ * @param dbName Name of the database
+ * @param tableName Name of the table
+ * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+ * @throws TableNotExistException thrown if the table does not exist in the catalog
*/
@throws[DatabaseNotExistException]
@throws[TableNotExistException]
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
/**
- * Modifies an existing table in the external catalog
+ * Modifies an existing table in the catalog.
*
- * @param table description of table which to modify
- * @param ignoreIfNotExists if the table not exist yet, not throw exception if ignoreIfNotExists
- * is true; else throw TableNotExistException
- * @throws DatabaseNotExistException if database does not exist in the catalog yet
- * @throws TableNotExistException if table does not exist in the catalog yet
+ * @param table New description of the table to update
+ * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+ * @throws TableNotExistException thrown if the table does not exist in the catalog
*/
@throws[DatabaseNotExistException]
@throws[TableNotExistException]
def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
/**
- * Adds database into external Catalog
+ * Adds a database to the catalog.
*
- * @param db description of database which to create
- * @param ignoreIfExists if database already exists in the catalog, not throw exception and leave
- * the existed database if ignoreIfExists is true;
- * else throw a DatabaseAlreadyExistException.
- * @throws DatabaseAlreadyExistException if database already exists in the catalog and
- * ignoreIfExists is false
+ * @param db Description of the database to create
+ * @param ignoreIfExists Flag to specify behavior if a database with the given name already
+ * exists: if set to false, it throws a DatabaseAlreadyExistException,
+ * if set to true, nothing happens.
+ * @throws DatabaseAlreadyExistException thrown if the database does already exist in the catalog
+ * and ignoreIfExists is false
*/
@throws[DatabaseAlreadyExistException]
def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit
/**
- * Deletes database from external Catalog
+ * Deletes a database from the catalog.
*
- * @param dbName database name
- * @param ignoreIfNotExists if database not exist yet, not throw exception if ignoreIfNotExists
- * is true; else throw DatabaseNotExistException
- * @throws DatabaseNotExistException if database does not exist in the catalog yet
+ * @param dbName Name of the database.
+ * @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
*/
@throws[DatabaseNotExistException]
def dropDatabase(dbName: String, ignoreIfNotExists: Boolean): Unit
/**
- * Modifies existed database into external Catalog
+ * Modifies an existing database in the catalog.
*
- * @param db description of database which to modify
- * @param ignoreIfNotExists if database not exist yet, not throw exception if ignoreIfNotExists
- * is true; else throw DatabaseNotExistException
- * @throws DatabaseNotExistException if database does not exist in the catalog yet
+ * @param db New description of the database to update
+ * @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
*/
@throws[DatabaseNotExistException]
def alterDatabase(db: ExternalCatalogDatabase, ignoreIfNotExists: Boolean): Unit
http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
index 58b62c7..00a35e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
@@ -23,49 +23,51 @@ import java.util.{List => JList}
import org.apache.flink.table.api._
/**
- * This class is responsible for read table/database from external catalog.
- * Its main responsibilities is provide tables for calcite catalog, it looks up databases or tables
- * in the external catalog.
+ * An [[ExternalCatalog]] is the connector between an external database catalog and Flink's
+ * Table API.
+ *
+ * It provides information about databases and tables such as names, schema, statistics, and
+ * access information.
*/
trait ExternalCatalog {
/**
- * Gets table from external Catalog
+ * Get a table from the catalog
*
- * @param dbName database name
- * @param tableName table name
- * @throws DatabaseNotExistException if database does not exist in the catalog yet
- * @throws TableNotExistException if table does not exist in the catalog yet
- * @return found table
+ * @param dbName The name of the table's database.
+ * @param tableName The name of the table.
+ * @throws DatabaseNotExistException thrown if the database does not exist in the catalog.
+ * @throws TableNotExistException thrown if the table does not exist in the catalog.
+ * @return the requested table
*/
@throws[DatabaseNotExistException]
@throws[TableNotExistException]
def getTable(dbName: String, tableName: String): ExternalCatalogTable
/**
- * Gets the table name lists from current external Catalog
+ * Get a list of all table names of a database in the catalog.
*
- * @param dbName database name
- * @throws DatabaseNotExistException if database does not exist in the catalog yet
- * @return lists of table name
+ * @param dbName The name of the database.
+ * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+ * @return The list of table names
*/
@throws[DatabaseNotExistException]
def listTables(dbName: String): JList[String]
/**
- * Gets database from external Catalog
+ * Gets a database from the catalog.
*
- * @param dbName database name
- * @throws DatabaseNotExistException if database does not exist in the catalog yet
- * @return found database
+ * @param dbName The name of the database.
+ * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+ * @return The requested database
*/
@throws[DatabaseNotExistException]
def getDatabase(dbName: String): ExternalCatalogDatabase
/**
- * Gets the database name lists from current external Catalog
+ * Gets a list of all databases in the catalog.
*
- * @return list of database names
+ * @return The list of database names
*/
def listDatabases(): JList[String]
http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
index c2a4702..99ab2eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.catalog
import java.util.{HashMap => JHashMap, Map => JMap}
/**
- * Database definition of the external catalog.
+ * Defines a database in an [[ExternalCatalog]].
*
- * @param dbName database name
- * @param properties database properties
+ * @param dbName The name of the database
+ * @param properties The properties of the database
*/
case class ExternalCatalogDatabase(
dbName: String,
http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 893cbb3..4fdab66 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -26,16 +26,16 @@ import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.plan.stats.TableStats
/**
- * Table definition of the external catalog.
+ * Defines a table in an [[ExternalCatalog]].
*
- * @param identifier identifier of external catalog table, including dbName and tableName
- * @param tableType type of external catalog table, e.g csv, hbase, kafka
- * @param schema schema of table data, including column names and column types
- * @param properties properties of external catalog table
- * @param stats statistics of external catalog table
- * @param comment comment of external catalog table
- * @param createTime create time of external catalog table
- * @param lastAccessTime last access time of of external catalog table
+ * @param identifier Identifier of the table (database name and table name)
+ * @param tableType Table type, e.g csv, hbase, kafka
+ * @param schema Schema of the table (column names and types)
+ * @param properties Properties of the table
+ * @param stats Statistics of the table
+ * @param comment Comment of the table
+ * @param createTime Create timestamp of the table
+ * @param lastAccessTime Timestamp of last access of the table
*/
case class ExternalCatalogTable(
identifier: TableIdentifier,
@@ -48,10 +48,10 @@ case class ExternalCatalogTable(
lastAccessTime: JLong = -1L)
/**
- * Identifier of external catalog table
+ * Identifier for a catalog table.
*
- * @param database database name
- * @param table table name
+ * @param database Database name
+ * @param table Table name
*/
case class TableIdentifier(
database: String,
http://git-wip-us.apache.org/repos/asf/flink/blob/f97deaa9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
index 13e54a6..ca6df9a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
@@ -22,26 +22,29 @@ import java.util.{Set => JSet}
import org.apache.flink.table.sources.TableSource
-/** Defines a converter used to convert [[org.apache.flink.table.sources.TableSource]] to
- * or from [[ExternalCatalogTable]].
+/** Creates a [[org.apache.flink.table.sources.TableSource]] from the properties of an
+ * [[ExternalCatalogTable]].
*
- * @tparam T The tableSource which to do convert operation on.
+ * The [[org.apache.flink.table.annotation.TableType]] annotation defines which type of external
+ * table is supported.
+ *
+ * @tparam T The [[TableSource]] to be created by this converter.
*/
trait TableSourceConverter[T <: TableSource[_]] {
/**
- * Defines the required properties that must exists in the properties of an ExternalCatalogTable
- * to ensure the input ExternalCatalogTable is compatible with the requirements of
- * current converter.
- * @return the required properties.
+ * Defines the properties that need to be provided by the [[ExternalCatalogTable]] to create
+ * the [[TableSource]].
+ *
+ * @return The required properties.
*/
def requiredProperties: JSet[String]
/**
- * Converts the input external catalog table instance to a tableSource instance.
+ * Creates a [[TableSource]] for the given [[ExternalCatalogTable]].
*
- * @param externalCatalogTable input external catalog table instance to convert
- * @return converted tableSource instance from input external catalog table.
+ * @param externalCatalogTable ExternalCatalogTable to create a TableSource from.
+ * @return The created TableSource.
*/
def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): T
[2/5] flink git commit: [FLINK-6089] [table] Add decoration phase for
stream queries to rewrite plans after the cost-based optimization.
Posted by fh...@apache.org.
[FLINK-6089] [table] Add decoration phase for stream queries to rewrite plans after the cost-based optimization.
This closes #3564.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6949c8c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6949c8c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6949c8c7
Branch: refs/heads/master
Commit: 6949c8c79c41344023df08dde2936f06daa00e0d
Parents: f97deaa
Author: hequn.chq <he...@alibaba-inc.com>
Authored: Thu Mar 16 11:11:17 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100
----------------------------------------------------------------------
.../table/api/StreamTableEnvironment.scala | 38 ++++++++-
.../flink/table/calcite/CalciteConfig.scala | 89 +++++++++++++++++---
.../flink/table/plan/rules/FlinkRuleSets.scala | 9 +-
.../flink/table/CalciteConfigBuilderTest.scala | 69 +++++++++++++++
4 files changed, 188 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d927c3a..225a675 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.RuleSet
+import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
@@ -39,6 +39,8 @@ import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.types.Row
+import _root_.scala.collection.JavaConverters._
+
/**
* The base class for stream TableEnvironments.
*
@@ -211,6 +213,26 @@ abstract class StreamTableEnvironment(
}
/**
+ * Returns the decoration rule set for this environment
+ * including a custom RuleSet configuration.
+ */
+ protected def getDecoRuleSet: RuleSet = {
+ val calciteConfig = config.getCalciteConfig
+ calciteConfig.getDecoRuleSet match {
+
+ case None =>
+ getBuiltInDecoRuleSet
+
+ case Some(ruleSet) =>
+ if (calciteConfig.replacesDecoRuleSet) {
+ ruleSet
+ } else {
+ RuleSets.ofList((getBuiltInDecoRuleSet.asScala ++ ruleSet.asScala).asJava)
+ }
+ }
+ }
+
+ /**
* Returns the built-in normalization rules that are defined by the environment.
*/
protected def getBuiltInNormRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_NORM_RULES
@@ -221,6 +243,11 @@ abstract class StreamTableEnvironment(
protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
/**
+ * Returns the built-in decoration rules that are defined by the environment.
+ */
+ protected def getBuiltInDecoRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_DECO_RULES
+
+ /**
* Generates the optimized [[RelNode]] tree from the original relational node tree.
*
* @param relNode The root node of the relational expression tree.
@@ -248,7 +275,14 @@ abstract class StreamTableEnvironment(
normalizedPlan
}
- optimizedPlan
+ // 4. decorate the optimized plan
+ val decoRuleSet = getDecoRuleSet
+ val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
+ runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, optimizedPlan, optimizedPlan.getTraitSet)
+ } else {
+ optimizedPlan
+ }
+ decoratedPlan
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
index 65a61b2..ba8df81 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -31,15 +31,36 @@ import scala.collection.JavaConverters._
* Builder for creating a Calcite configuration.
*/
class CalciteConfigBuilder {
+
+ /**
+ * Defines the normalization rule set. Normalization rules are dedicated for rewriting
+ * predicated logical plan before volcano optimization.
+ */
private var replaceNormRules: Boolean = false
private var normRuleSets: List[RuleSet] = Nil
+ /**
+ * Defines the optimization rule set. Optimization rules are used during volcano optimization.
+ */
private var replaceOptRules: Boolean = false
private var optRuleSets: List[RuleSet] = Nil
+ /**
+ * Defines the decoration rule set. Decoration rules are dedicated for rewriting predicated
+ * logical plan after volcano optimization.
+ */
+ private var replaceDecoRules: Boolean = false
+ private var decoRuleSets: List[RuleSet] = Nil
+
+ /**
+ * Defines the SQL operator tables.
+ */
private var replaceOperatorTable: Boolean = false
private var operatorTables: List[SqlOperatorTable] = Nil
+ /**
+ * Defines a SQL parser configuration.
+ */
private var replaceSqlParserConfig: Option[SqlParser.Config] = None
/**
@@ -81,6 +102,32 @@ class CalciteConfigBuilder {
}
/**
+ * Replaces the built-in decoration rule set with the given rule set.
+ *
+ * The decoration rules are applied after the cost-based optimization phase.
+ * The decoration phase allows to rewrite the optimized plan and is not cost-based.
+ *
+ */
+ def replaceDecoRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(replaceRuleSet)
+ decoRuleSets = List(replaceRuleSet)
+ replaceDecoRules = true
+ this
+ }
+
+ /**
+ * Appends the given decoration rule set to the built-in rule set.
+ *
+ * The decoration rules are applied after the cost-based optimization phase.
+ * The decoration phase allows to rewrite the optimized plan and is not cost-based.
+ */
+ def addDecoRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(addedRuleSet)
+ decoRuleSets = addedRuleSet :: decoRuleSets
+ this
+ }
+
+ /**
* Replaces the built-in SQL operator table with the given table.
*/
def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
@@ -113,35 +160,39 @@ class CalciteConfigBuilder {
val replacesNormRuleSet: Boolean,
val getOptRuleSet: Option[RuleSet],
val replacesOptRuleSet: Boolean,
+ val getDecoRuleSet: Option[RuleSet],
+ val replacesDecoRuleSet: Boolean,
val getSqlOperatorTable: Option[SqlOperatorTable],
val replacesSqlOperatorTable: Boolean,
val getSqlParserConfig: Option[SqlParser.Config])
extends CalciteConfig
+
/**
- * Builds a new [[CalciteConfig]].
+ * Convert the [[RuleSet]] List to [[Option]] type
*/
- def build(): CalciteConfig = new CalciteConfigImpl(
- normRuleSets match {
+ private def getRuleSet(inputRuleSet: List[RuleSet]): Option[RuleSet] = {
+ inputRuleSet match {
case Nil => None
case h :: Nil => Some(h)
case _ =>
// concat rule sets
val concatRules =
- normRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
+ inputRuleSet.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
Some(RuleSets.ofList(concatRules.asJava))
- },
+ }
+ }
+
+ /**
+ * Builds a new [[CalciteConfig]].
+ */
+ def build(): CalciteConfig = new CalciteConfigImpl(
+ getRuleSet(normRuleSets),
replaceNormRules,
- optRuleSets match {
- case Nil => None
- case h :: Nil => Some(h)
- case _ =>
- // concat rule sets
- val concatRules =
- optRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
- Some(RuleSets.ofList(concatRules.asJava))
- },
+ getRuleSet(optRuleSets),
replaceOptRules,
+ getRuleSet(decoRuleSets),
+ replaceDecoRules,
operatorTables match {
case Nil => None
case h :: Nil => Some(h)
@@ -179,6 +230,16 @@ trait CalciteConfig {
def getOptRuleSet: Option[RuleSet]
/**
+ * Returns whether this configuration replaces the built-in decoration rule set.
+ */
+ def replacesDecoRuleSet: Boolean
+
+ /**
+ * Returns a custom decoration rule set.
+ */
+ def getDecoRuleSet: Option[RuleSet]
+
+ /**
* Returns whether this configuration replaces the built-in SQL operator table.
*/
def replacesSqlOperatorTable: Boolean
http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 952ee34..1301c8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -23,7 +23,6 @@ import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.table.calcite.rules.{FlinkAggregateExpandDistinctAggregatesRule, FlinkAggregateJoinTransposeRule}
import org.apache.flink.table.plan.rules.dataSet._
import org.apache.flink.table.plan.rules.datastream._
-import org.apache.flink.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
object FlinkRuleSets {
@@ -186,4 +185,12 @@ object FlinkRuleSets {
PushFilterIntoStreamTableSourceScanRule.INSTANCE
)
+ /**
+ * RuleSet to decorate plans for stream / DataStream execution
+ */
+ val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
+ // rules
+
+ )
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
index 6c07e28..d0de8fa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -39,6 +39,9 @@ class CalciteConfigBuilderTest {
assertFalse(cc.replacesOptRuleSet)
assertFalse(cc.getOptRuleSet.isDefined)
+
+ assertFalse(cc.replacesDecoRuleSet)
+ assertFalse(cc.getDecoRuleSet.isDefined)
}
@Test
@@ -47,6 +50,7 @@ class CalciteConfigBuilderTest {
val cc: CalciteConfig = new CalciteConfigBuilder()
.addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
.replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+ .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
.build()
assertFalse(cc.replacesNormRuleSet)
@@ -54,6 +58,9 @@ class CalciteConfigBuilderTest {
assertTrue(cc.replacesOptRuleSet)
assertTrue(cc.getOptRuleSet.isDefined)
+
+ assertTrue(cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
}
@Test
@@ -181,6 +188,68 @@ class CalciteConfigBuilderTest {
}
@Test
+ def testReplaceDecorationRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .build()
+
+ assertEquals(true, cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
+ val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+ assertEquals(1, cSet.size)
+ assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ }
+
+ @Test
+ def testReplaceDecorationAddRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE))
+ .build()
+
+ assertEquals(true, cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
+ val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+ assertEquals(2, cSet.size)
+ assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+ }
+
+ @Test
+ def testAddDecorationRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .build()
+
+ assertEquals(false, cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
+ val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+ assertEquals(1, cSet.size)
+ assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ }
+
+ @Test
+ def testAddAddDecorationRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE))
+ .build()
+
+ assertEquals(false, cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
+ val cList = cc.getDecoRuleSet.get.iterator().asScala.toList
+ assertEquals(3, cList.size)
+ assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE)
+ assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE)
+ assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE)
+ }
+
+ @Test
def testDefaultOperatorTable(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()