You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:45 UTC
[11/15] flink git commit: [FLINK-6091] [table] Implement and turn on
retraction for non-windowed aggregates.
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
new file mode 100644
index 0000000..8e95c93
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+ * MapRunner with [[CRow]] input.
+ */
+class CRowInputMapRunner[OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT])
+ extends RichMapFunction[CRow, OUT]
+ with ResultTypeQueryable[OUT]
+ with Compiler[MapFunction[Row, OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: MapFunction[Row, OUT] = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating MapFunction.")
+ function = clazz.newInstance()
+ }
+
+ override def map(in: CRow): OUT = {
+ function.map(in.row)
+ }
+
+ override def getProducedType: TypeInformation[OUT] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
new file mode 100644
index 0000000..966dea9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+ * MapRunner with [[CRow]] output.
+ */
+class CRowOutputMapRunner(
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[CRow])
+ extends RichMapFunction[Any, CRow]
+ with ResultTypeQueryable[CRow]
+ with Compiler[MapFunction[Any, Row]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: MapFunction[Any, Row] = _
+ private var outCRow: CRow = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating MapFunction.")
+ function = clazz.newInstance()
+ outCRow = new CRow(null, true)
+ }
+
+ override def map(in: Any): CRow = {
+ outCRow.row = function.map(in)
+ outCRow
+ }
+
+ override def getProducedType: TypeInformation[CRow] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
new file mode 100644
index 0000000..b2b062e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+ * The collector is used to wrap a [[Row]] to a [[CRow]]
+ */
+class CRowWrappingCollector() extends Collector[Row] {
+
+ var out: Collector[CRow] = _
+ val outCRow: CRow = new CRow()
+
+ def setChange(change: Boolean): Unit = this.outCRow.change = change
+
+ override def collect(record: Row): Unit = {
+ outCRow.row = record
+ out.collect(outCRow)
+ }
+
+ override def close(): Unit = out.close()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
index b446306..2e37baf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
@@ -24,20 +24,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.table.codegen.Compiler
import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory
-class FlatMapRunner[IN, OUT](
+class FlatMapRunner(
name: String,
code: String,
- @transient returnType: TypeInformation[OUT])
- extends RichFlatMapFunction[IN, OUT]
- with ResultTypeQueryable[OUT]
- with Compiler[FlatMapFunction[IN, OUT]] {
+ @transient returnType: TypeInformation[Row])
+ extends RichFlatMapFunction[Row, Row]
+ with ResultTypeQueryable[Row]
+ with Compiler[FlatMapFunction[Row, Row]] {
val LOG = LoggerFactory.getLogger(this.getClass)
- private var function: FlatMapFunction[IN, OUT] = _
+ private var function: FlatMapFunction[Row, Row] = _
override def open(parameters: Configuration): Unit = {
LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
@@ -48,10 +49,10 @@ class FlatMapRunner[IN, OUT](
FunctionUtils.openFunction(function, parameters)
}
- override def flatMap(in: IN, out: Collector[OUT]): Unit =
+ override def flatMap(in: Row, out: Collector[Row]): Unit =
function.flatMap(in, out)
- override def getProducedType: TypeInformation[OUT] = returnType
+ override def getProducedType: TypeInformation[Row] = returnType
override def close(): Unit = {
FunctionUtils.closeFunction(function)
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 377e0ff..dd9c015 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.aggregate
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
import org.slf4j.LoggerFactory
@@ -30,28 +31,28 @@ import org.slf4j.LoggerFactory
* @param genAggregations Generated aggregate helper function
*/
class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
- extends AggregateFunction[Row, Row, Row] with Compiler[GeneratedAggregations] {
+ extends AggregateFunction[CRow, Row, Row] with Compiler[GeneratedAggregations] {
val LOG = LoggerFactory.getLogger(this.getClass)
private var function: GeneratedAggregations = _
override def createAccumulator(): Row = {
if (function == null) {
- initFunction
+ initFunction()
}
function.createAccumulators()
}
- override def add(value: Row, accumulatorRow: Row): Unit = {
+ override def add(value: CRow, accumulatorRow: Row): Unit = {
if (function == null) {
- initFunction
+ initFunction()
}
- function.accumulate(accumulatorRow, value)
+ function.accumulate(accumulatorRow, value.row)
}
override def getResult(accumulatorRow: Row): Row = {
if (function == null) {
- initFunction
+ initFunction()
}
val output = function.createOutputRow()
function.setAggregationResults(accumulatorRow, output)
@@ -60,7 +61,7 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = {
if (function == null) {
- initFunction
+ initFunction()
}
function.mergeAccumulatorsPair(aAccumulatorRow, bAccumulatorRow)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 5e9efd0..768c9cb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -44,6 +44,7 @@ import org.apache.flink.table.functions.utils.AggSqlFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.typeutils.TypeCheckUtils._
import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
import org.apache.flink.types.Row
@@ -79,7 +80,7 @@ object AggregateUtil {
isRowTimeType: Boolean,
isPartitioned: Boolean,
isRowsClause: Boolean)
- : ProcessFunction[Row, Row] = {
+ : ProcessFunction[CRow, CRow] = {
val (aggFields, aggregates) =
transformToAggregateFunctions(
@@ -116,13 +117,13 @@ object AggregateUtil {
new RowTimeUnboundedRowsOver(
genFunction,
aggregationStateType,
- inputTypeInfo)
+ CRowTypeInfo(inputTypeInfo))
} else {
// RANGE unbounded over process function
new RowTimeUnboundedRangeOver(
genFunction,
aggregationStateType,
- inputTypeInfo)
+ CRowTypeInfo(inputTypeInfo))
}
} else {
if (isPartitioned) {
@@ -153,13 +154,16 @@ object AggregateUtil {
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputRowType: RelDataType,
inputFieldTypes: Seq[TypeInformation[_]],
- groupings: Array[Int]): ProcessFunction[Row, Row] = {
+ groupings: Array[Int],
+ generateRetraction: Boolean,
+ consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
val (aggFields, aggregates) =
transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputRowType,
- needRetraction = false)
+ consumeRetraction)
+
val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
val outputArity = groupings.length + aggregates.length
@@ -178,14 +182,16 @@ object AggregateUtil {
None,
None,
outputArity,
- needRetract = false,
+ consumeRetraction,
needMerge = false,
needReset = false
)
new GroupAggProcessFunction(
genFunction,
- aggregationStateType)
+ aggregationStateType,
+ generateRetraction)
+
}
/**
@@ -198,7 +204,7 @@ object AggregateUtil {
* @param inputTypeInfo Physical type information of the row.
* @param inputFieldTypeInfo Physical type information of the row's fields.
* @param precedingOffset the preceding offset
- * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
+ * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
* @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
* @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
*/
@@ -211,7 +217,7 @@ object AggregateUtil {
precedingOffset: Long,
isRowsClause: Boolean,
isRowTimeType: Boolean)
- : ProcessFunction[Row, Row] = {
+ : ProcessFunction[CRow, CRow] = {
val needRetract = true
val (aggFields, aggregates) =
@@ -221,6 +227,7 @@ object AggregateUtil {
needRetract)
val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+ val inputRowType = CRowTypeInfo(inputTypeInfo)
val forwardMapping = (0 until inputType.getFieldCount).toArray
val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
@@ -248,14 +255,14 @@ object AggregateUtil {
new RowTimeBoundedRowsOver(
genFunction,
aggregationStateType,
- inputTypeInfo,
+ inputRowType,
precedingOffset
)
} else {
new RowTimeBoundedRangeOver(
genFunction,
aggregationStateType,
- inputTypeInfo,
+ inputRowType,
precedingOffset
)
}
@@ -265,13 +272,13 @@ object AggregateUtil {
genFunction,
precedingOffset,
aggregationStateType,
- inputTypeInfo)
+ inputRowType)
} else {
new ProcTimeBoundedRangeOver(
genFunction,
precedingOffset,
aggregationStateType,
- inputTypeInfo)
+ inputRowType)
}
}
}
@@ -932,7 +939,7 @@ object AggregateUtil {
window: LogicalWindow,
finalRowArity: Int,
properties: Seq[NamedWindowProperty])
- : AllWindowFunction[Row, Row, DataStreamWindow] = {
+ : AllWindowFunction[Row, CRow, DataStreamWindow] = {
if (isTimeWindow(window)) {
val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
@@ -940,7 +947,7 @@ object AggregateUtil {
startPos,
endPos,
finalRowArity)
- .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+ .asInstanceOf[AllWindowFunction[Row, CRow, DataStreamWindow]]
} else {
new IncrementalAggregateAllWindowFunction(
finalRowArity)
@@ -955,8 +962,8 @@ object AggregateUtil {
numGroupingKeys: Int,
numAggregates: Int,
finalRowArity: Int,
- properties: Seq[NamedWindowProperty])
- : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+ properties: Seq[NamedWindowProperty]):
+ WindowFunction[Row, CRow, Tuple, DataStreamWindow] = {
if (isTimeWindow(window)) {
val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
@@ -966,7 +973,7 @@ object AggregateUtil {
startPos,
endPos,
finalRowArity)
- .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+ .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]]
} else {
new IncrementalAggregateWindowFunction(
numGroupingKeys,
@@ -981,8 +988,9 @@ object AggregateUtil {
inputType: RelDataType,
inputFieldTypeInfo: Seq[TypeInformation[_]],
outputType: RelDataType,
+ groupingKeys: Array[Int],
needMerge: Boolean)
- : (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = {
+ : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo, RowTypeInfo) = {
val needRetract = false
val (aggFields, aggregates) =
@@ -1002,7 +1010,7 @@ object AggregateUtil {
aggFields,
aggMapping,
partialResults = false,
- Array(), // no fields are forwarded
+ groupingKeys,
None,
None,
outputArity,
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index 95699a2..fabf200 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -56,7 +56,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
extends RichGroupReduceFunction[Row, Row]
with Compiler[GeneratedAggregations] {
- private var collector: TimeWindowPropertyCollector = _
+ private var collector: RowTimeWindowPropertyCollector = _
private val intermediateRowWindowStartPos = keysAndAggregatesArity
private val intermediateRowWindowEndPos = keysAndAggregatesArity + 1
@@ -78,7 +78,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
output = function.createOutputRow()
accumulators = function.createAccumulators()
- collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+ collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index a221c53..56ed08a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -47,7 +47,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
extends RichGroupReduceFunction[Row, Row]
with Compiler[GeneratedAggregations] {
- private var collector: TimeWindowPropertyCollector = _
+ private var collector: RowTimeWindowPropertyCollector = _
protected val windowStartPos: Int = keysAndAggregatesArity
private var output: Row = _
@@ -68,7 +68,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
output = function.createOutputRow()
accumulators = function.createAccumulators()
- collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+ collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
}
override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index f4a1fc5..8af2c2e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -46,7 +46,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
extends RichGroupReduceFunction[Row, Row]
with Compiler[GeneratedAggregations] {
- private var collector: TimeWindowPropertyCollector = _
+ private var collector: RowTimeWindowPropertyCollector = _
protected var aggregateBuffer: Row = new Row(keysAndAggregatesArity + 1)
private var output: Row = _
@@ -67,7 +67,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
output = function.createOutputRow()
accumulators = function.createAccumulators()
- collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+ collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
}
override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 81c900c..745f24d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.slf4j.LoggerFactory
+import org.apache.flink.table.runtime.types.CRow
/**
* Aggregate Function used for the groupby (without window) aggregate
@@ -35,14 +36,17 @@ import org.slf4j.LoggerFactory
*/
class GroupAggProcessFunction(
private val genAggregations: GeneratedAggregationsFunction,
- private val aggregationStateType: RowTypeInfo)
- extends ProcessFunction[Row, Row]
+ private val aggregationStateType: RowTypeInfo,
+ private val generateRetraction: Boolean)
+ extends ProcessFunction[CRow, CRow]
with Compiler[GeneratedAggregations] {
val LOG = LoggerFactory.getLogger(this.getClass)
private var function: GeneratedAggregations = _
- private var output: Row = _
+ private var newRow: CRow = _
+ private var prevRow: CRow = _
+ private var firstRow: Boolean = _
private var state: ValueState[Row] = _
override def open(config: Configuration) {
@@ -54,7 +58,9 @@ class GroupAggProcessFunction(
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
- output = function.createOutputRow()
+
+ newRow = new CRow(function.createOutputRow(), true)
+ prevRow = new CRow(function.createOutputRow(), false)
val stateDescriptor: ValueStateDescriptor[Row] =
new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType)
@@ -62,29 +68,53 @@ class GroupAggProcessFunction(
}
override def processElement(
- input: Row,
- ctx: ProcessFunction[Row, Row]#Context,
- out: Collector[Row]): Unit = {
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
// get accumulators
var accumulators = state.value()
if (null == accumulators) {
+ firstRow = true
accumulators = function.createAccumulators()
+ } else {
+ firstRow = false
}
// Set group keys value to the final output
- function.setForwardedFields(input, output)
+ function.setForwardedFields(input, newRow.row)
+ function.setForwardedFields(input, prevRow.row)
- // accumulate new input row
- function.accumulate(accumulators, input)
+ // Set previous aggregate result to the prevRow
+ function.setAggregationResults(accumulators, prevRow.row)
- // set aggregation results to output
- function.setAggregationResults(accumulators, output)
+ // update aggregate result and set to the newRow
+ if (inputC.change) {
+ // accumulate input
+ function.accumulate(accumulators, input)
+ function.setAggregationResults(accumulators, newRow.row)
+ } else {
+ // retract input
+ function.retract(accumulators, input)
+ function.setAggregationResults(accumulators, newRow.row)
+ }
// update accumulators
state.update(accumulators)
- out.collect(output)
- }
+ // if previousRow is not null, do retraction process
+ if (generateRetraction && !firstRow) {
+ if (prevRow.row.equals(newRow.row)) {
+ // ignore same newRow
+ return
+ } else {
+ // retract previous row
+ out.collect(prevRow)
+ }
+ }
+ out.collect(newRow)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
index ec9b654..711cc05 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -22,6 +22,7 @@ import java.lang.Iterable
import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.util.Collector
/**
@@ -39,17 +40,17 @@ class IncrementalAggregateAllTimeWindowFunction(
extends IncrementalAggregateAllWindowFunction[TimeWindow](
finalRowArity) {
- private var collector: TimeWindowPropertyCollector = _
+ private var collector: CRowTimeWindowPropertyCollector = _
override def open(parameters: Configuration): Unit = {
- collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+ collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
super.open(parameters)
}
override def apply(
window: TimeWindow,
records: Iterable[Row],
- out: Collector[Row]): Unit = {
+ out: Collector[CRow]): Unit = {
// set collector and window
collector.wrappedCollector = out
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index f92be92..c190785 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -23,6 +23,7 @@ import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.util.Collector
/**
@@ -32,12 +33,12 @@ import org.apache.flink.util.Collector
*/
class IncrementalAggregateAllWindowFunction[W <: Window](
private val finalRowArity: Int)
- extends RichAllWindowFunction[Row, Row, W] {
+ extends RichAllWindowFunction[Row, CRow, W] {
- private var output: Row = _
+ private var output: CRow = _
override def open(parameters: Configuration): Unit = {
- output = new Row(finalRowArity)
+ output = new CRow(new Row(finalRowArity), true)
}
/**
@@ -47,7 +48,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
override def apply(
window: W,
records: Iterable[Row],
- out: Collector[Row]): Unit = {
+ out: Collector[CRow]): Unit = {
val iterator = records.iterator
@@ -55,7 +56,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
val record = iterator.next()
var i = 0
while (i < record.getArity) {
- output.setField(i, record.getField(i))
+ output.row.setField(i, record.getField(i))
i += 1
}
out.collect(output)
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index dccb4f6..809bbfd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.util.Collector
/**
@@ -43,10 +44,10 @@ class IncrementalAggregateTimeWindowFunction(
numAggregates,
finalRowArity) {
- private var collector: TimeWindowPropertyCollector = _
+ private var collector: CRowTimeWindowPropertyCollector = _
override def open(parameters: Configuration): Unit = {
- collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+ collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
super.open(parameters)
}
@@ -54,7 +55,7 @@ class IncrementalAggregateTimeWindowFunction(
key: Tuple,
window: TimeWindow,
records: Iterable[Row],
- out: Collector[Row]): Unit = {
+ out: Collector[CRow]): Unit = {
// set collector and window
collector.wrappedCollector = out
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index 983efb3..7e9d738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -24,6 +24,7 @@ import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.util.Collector
/**
@@ -37,12 +38,12 @@ class IncrementalAggregateWindowFunction[W <: Window](
private val numGroupingKey: Int,
private val numAggregates: Int,
private val finalRowArity: Int)
- extends RichWindowFunction[Row, Row, Tuple, W] {
+ extends RichWindowFunction[Row, CRow, Tuple, W] {
- private var output: Row = _
+ private var output: CRow = _
override def open(parameters: Configuration): Unit = {
- output = new Row(finalRowArity)
+ output = new CRow(new Row(finalRowArity), true)
}
/**
@@ -53,7 +54,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
key: Tuple,
window: W,
records: Iterable[Row],
- out: Collector[Row]): Unit = {
+ out: Collector[CRow]): Unit = {
val iterator = records.iterator
@@ -62,12 +63,12 @@ class IncrementalAggregateWindowFunction[W <: Window](
var i = 0
while (i < numGroupingKey) {
- output.setField(i, key.getField(i))
+ output.row.setField(i, key.getField(i))
i += 1
}
i = 0
while (i < numAggregates) {
- output.setField(numGroupingKey + i, record.getField(i))
+ output.row.setField(numGroupingKey + i, record.getField(i))
i += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index b63eb81..3fb506f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -31,7 +31,8 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
import java.util.{ArrayList, List => JList}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.slf4j.LoggerFactory
/**
@@ -47,10 +48,10 @@ class ProcTimeBoundedRangeOver(
genAggregations: GeneratedAggregationsFunction,
precedingTimeBoundary: Long,
aggregatesTypeInfo: RowTypeInfo,
- inputType: TypeInformation[Row])
- extends ProcessFunction[Row, Row]
+ inputType: TypeInformation[CRow])
+ extends ProcessFunction[CRow, CRow]
with Compiler[GeneratedAggregations] {
- private var output: Row = _
+ private var output: CRow = _
private var accumulatorState: ValueState[Row] = _
private var rowMapState: MapState[Long, JList[Row]] = _
@@ -66,11 +67,12 @@ class ProcTimeBoundedRangeOver(
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
- output = function.createOutputRow()
+ output = new CRow(function.createOutputRow(), true)
// We keep the elements received in a MapState indexed based on their ingestion time
val rowListTypeInfo: TypeInformation[JList[Row]] =
- new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+ new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
+ .asInstanceOf[TypeInformation[JList[Row]]]
val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
@@ -82,9 +84,9 @@ class ProcTimeBoundedRangeOver(
}
override def processElement(
- input: Row,
- ctx: ProcessFunction[Row, Row]#Context,
- out: Collector[Row]): Unit = {
+ input: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
val currentTime = ctx.timerService.currentProcessingTime
// buffer the event incoming event
@@ -97,15 +99,15 @@ class ProcTimeBoundedRangeOver(
// register timer to process event once the current millisecond passed
ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
}
- rowList.add(input)
+ rowList.add(input.row)
rowMapState.put(currentTime, rowList)
}
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[Row, Row]#OnTimerContext,
- out: Collector[Row]): Unit = {
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
// we consider the original timestamp of events that have registered this time trigger 1 ms ago
val currentTime = timestamp - 1
@@ -166,10 +168,10 @@ class ProcTimeBoundedRangeOver(
val input = currentElements.get(iElemenets)
// set the fields of the last event to carry on with the aggregates
- function.setForwardedFields(input, output)
+ function.setForwardedFields(input, output.row)
// add the accumulators values to result
- function.setAggregationResults(accumulators, output)
+ function.setAggregationResults(accumulators, output.row)
out.collect(output)
iElemenets += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
index 31cfd73..0c7f44e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -33,7 +33,8 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
import java.util.{List => JList}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.slf4j.LoggerFactory
/**
@@ -48,15 +49,15 @@ class ProcTimeBoundedRowsOver(
genAggregations: GeneratedAggregationsFunction,
precedingOffset: Long,
aggregatesTypeInfo: RowTypeInfo,
- inputType: TypeInformation[Row])
- extends ProcessFunction[Row, Row]
+ inputType: TypeInformation[CRow])
+ extends ProcessFunction[CRow, CRow]
with Compiler[GeneratedAggregations] {
Preconditions.checkArgument(precedingOffset > 0)
private var accumulatorState: ValueState[Row] = _
private var rowMapState: MapState[Long, JList[Row]] = _
- private var output: Row = _
+ private var output: CRow = _
private var counterState: ValueState[Long] = _
private var smallestTsState: ValueState[Long] = _
@@ -73,13 +74,14 @@ class ProcTimeBoundedRowsOver(
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
- output = function.createOutputRow()
+ output = new CRow(function.createOutputRow(), true)
// We keep the elements received in a Map state keyed
// by the ingestion time in the operator.
// we also keep counter of processed elements
// and timestamp of oldest element
val rowListTypeInfo: TypeInformation[JList[Row]] =
- new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+ new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
+ .asInstanceOf[TypeInformation[JList[Row]]]
val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
@@ -100,9 +102,11 @@ class ProcTimeBoundedRowsOver(
}
override def processElement(
- input: Row,
- ctx: ProcessFunction[Row, Row]#Context,
- out: Collector[Row]): Unit = {
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
val currentTime = ctx.timerService.currentProcessingTime
@@ -154,11 +158,11 @@ class ProcTimeBoundedRowsOver(
}
// copy forwarded fields in output row
- function.setForwardedFields(input, output)
+ function.setForwardedFields(input, output.row)
// accumulate current row and set aggregate in output row
function.accumulate(accumulators, input)
- function.setAggregationResults(accumulators, output)
+ function.setAggregationResults(accumulators, output.row)
// update map state, accumulator state, counter and timestamp
val currentTimeState = rowMapState.get(currentTime)
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
index 75209db..8a23132 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
@@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
import org.apache.flink.util.Collector
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
import org.slf4j.LoggerFactory
/**
@@ -37,12 +38,12 @@ import org.slf4j.LoggerFactory
class ProcTimeUnboundedNonPartitionedOver(
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo)
- extends ProcessFunction[Row, Row]
+ extends ProcessFunction[CRow, CRow]
with CheckpointedFunction
with Compiler[GeneratedAggregations] {
private var accumulators: Row = _
- private var output: Row = _
+ private var output: CRow = _
private var state: ListState[Row] = _
val LOG = LoggerFactory.getLogger(this.getClass)
@@ -58,7 +59,7 @@ class ProcTimeUnboundedNonPartitionedOver(
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
- output = function.createOutputRow()
+ output = new CRow(function.createOutputRow(), true)
if (null == accumulators) {
val it = state.get().iterator()
if (it.hasNext) {
@@ -70,14 +71,16 @@ class ProcTimeUnboundedNonPartitionedOver(
}
override def processElement(
- input: Row,
- ctx: ProcessFunction[Row, Row]#Context,
- out: Collector[Row]): Unit = {
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
- function.setForwardedFields(input, output)
+ function.setForwardedFields(input, output.row)
function.accumulate(accumulators, input)
- function.setAggregationResults(accumulators, output)
+ function.setAggregationResults(accumulators, output.row)
out.collect(output)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
index 9baa6a3..847c1bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
@@ -24,7 +24,8 @@ import org.apache.flink.util.Collector
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
import org.slf4j.LoggerFactory
/**
@@ -36,10 +37,10 @@ import org.slf4j.LoggerFactory
class ProcTimeUnboundedPartitionedOver(
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo)
- extends ProcessFunction[Row, Row]
+ extends ProcessFunction[CRow, CRow]
with Compiler[GeneratedAggregations] {
- private var output: Row = _
+ private var output: CRow = _
private var state: ValueState[Row] = _
val LOG = LoggerFactory.getLogger(this.getClass)
private var function: GeneratedAggregations = _
@@ -54,16 +55,18 @@ class ProcTimeUnboundedPartitionedOver(
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
- output = function.createOutputRow()
+ output = new CRow(function.createOutputRow(), true)
val stateDescriptor: ValueStateDescriptor[Row] =
new ValueStateDescriptor[Row]("overState", aggregationStateType)
state = getRuntimeContext.getState(stateDescriptor)
}
override def processElement(
- input: Row,
- ctx: ProcessFunction[Row, Row]#Context,
- out: Collector[Row]): Unit = {
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
var accumulators = state.value()
@@ -71,13 +74,12 @@ class ProcTimeUnboundedPartitionedOver(
accumulators = function.createAccumulators()
}
- function.setForwardedFields(input, output)
+ function.setForwardedFields(input, output.row)
function.accumulate(accumulators, input)
- function.setAggregationResults(accumulators, output)
+ function.setAggregationResults(accumulators, output.row)
state.update(accumulators)
-
out.collect(output)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index ef97e71..4020d44 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -17,14 +17,15 @@
*/
package org.apache.flink.table.runtime.aggregate
-import java.util.{List => JList, ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, List => JList}
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.types.Row
import org.apache.flink.util.{Collector, Preconditions}
import org.slf4j.LoggerFactory
@@ -39,15 +40,15 @@ import org.slf4j.LoggerFactory
*/
class RowTimeBoundedRangeOver(
genAggregations: GeneratedAggregationsFunction,
- aggregationStateType: TypeInformation[Row],
- inputRowType: TypeInformation[Row],
+ aggregationStateType: RowTypeInfo,
+ inputRowType: CRowTypeInfo,
precedingOffset: Long)
- extends ProcessFunction[Row, Row]
+ extends ProcessFunction[CRow, CRow]
with Compiler[GeneratedAggregations] {
Preconditions.checkNotNull(aggregationStateType)
Preconditions.checkNotNull(precedingOffset)
- private var output: Row = _
+ private var output: CRow = _
// the state which keeps the last triggering timestamp
private var lastTriggeringTsState: ValueState[Long] = _
@@ -74,7 +75,7 @@ class RowTimeBoundedRangeOver(
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
- output = function.createOutputRow()
+ output = new CRow(function.createOutputRow(), true)
val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
@@ -86,7 +87,8 @@ class RowTimeBoundedRangeOver(
val keyTypeInformation: TypeInformation[Long] =
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
- val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+ val valueTypeInformation: TypeInformation[JList[Row]] =
+ new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType)
val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
new MapStateDescriptor[Long, JList[Row]](
@@ -98,9 +100,11 @@ class RowTimeBoundedRangeOver(
}
override def processElement(
- input: Row,
- ctx: ProcessFunction[Row, Row]#Context,
- out: Collector[Row]): Unit = {
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
// triggering timestamp for trigger calculation
val triggeringTs = ctx.timestamp
@@ -125,8 +129,8 @@ class RowTimeBoundedRangeOver(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[Row, Row]#OnTimerContext,
- out: Collector[Row]): Unit = {
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
// gets all window data from state for the calculation
val inputs: JList[Row] = dataState.get(timestamp)
@@ -172,13 +176,13 @@ class RowTimeBoundedRangeOver(
}
// set aggregate in output row
- function.setAggregationResults(accumulators, output)
+ function.setAggregationResults(accumulators, output.row)
// copy forwarded fields to output row and emit output row
dataListIndex = 0
while (dataListIndex < inputs.size()) {
aggregatesIndex = 0
- function.setForwardedFields(inputs.get(dataListIndex), output)
+ function.setForwardedFields(inputs.get(dataListIndex), output.row)
out.collect(output)
dataListIndex += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index 7169cf7..5ec6ec7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -27,7 +27,8 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.types.Row
import org.apache.flink.util.{Collector, Preconditions}
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.slf4j.LoggerFactory
/**
@@ -41,15 +42,15 @@ import org.slf4j.LoggerFactory
class RowTimeBoundedRowsOver(
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo,
- inputRowType: TypeInformation[Row],
+ inputRowType: CRowTypeInfo,
precedingOffset: Long)
- extends ProcessFunction[Row, Row]
+ extends ProcessFunction[CRow, CRow]
with Compiler[GeneratedAggregations] {
Preconditions.checkNotNull(aggregationStateType)
Preconditions.checkNotNull(precedingOffset)
- private var output: Row = _
+ private var output: CRow = _
// the state which keeps the last triggering timestamp
private var lastTriggeringTsState: ValueState[Long] = _
@@ -79,7 +80,7 @@ class RowTimeBoundedRowsOver(
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
- output = function.createOutputRow()
+ output = new CRow(function.createOutputRow(), true)
val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
@@ -95,7 +96,8 @@ class RowTimeBoundedRowsOver(
val keyTypeInformation: TypeInformation[Long] =
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
- val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+ val valueTypeInformation: TypeInformation[JList[Row]] =
+ new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType)
val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
new MapStateDescriptor[Long, JList[Row]](
@@ -107,9 +109,11 @@ class RowTimeBoundedRowsOver(
}
override def processElement(
- input: Row,
- ctx: ProcessFunction[Row, Row]#Context,
- out: Collector[Row]): Unit = {
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
// triggering timestamp for trigger calculation
val triggeringTs = ctx.timestamp
@@ -134,8 +138,8 @@ class RowTimeBoundedRowsOver(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[Row, Row]#OnTimerContext,
- out: Collector[Row]): Unit = {
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
// gets all window data from state for the calculation
val inputs: JList[Row] = dataState.get(timestamp)
@@ -189,7 +193,7 @@ class RowTimeBoundedRowsOver(
}
// copy forwarded fields to output row
- function.setForwardedFields(input, output)
+ function.setForwardedFields(input, output.row)
// retract old row from accumulators
if (null != retractRow) {
@@ -198,7 +202,7 @@ class RowTimeBoundedRowsOver(
// accumulate current row and set aggregate in output row
function.accumulate(accumulators, input)
- function.setAggregationResults(accumulators, output)
+ function.setAggregationResults(accumulators, output.row)
i += 1
out.collect(output)
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index 525d4d7..3e2a811 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -28,7 +28,8 @@ import org.apache.flink.util.{Collector, Preconditions}
import org.apache.flink.api.common.state._
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.streaming.api.operators.TimestampedCollector
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.slf4j.LoggerFactory
@@ -42,11 +43,11 @@ import org.slf4j.LoggerFactory
abstract class RowTimeUnboundedOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
- inputType: TypeInformation[Row])
- extends ProcessFunction[Row, Row]
+ inputType: TypeInformation[CRow])
+ extends ProcessFunction[CRow, CRow]
with Compiler[GeneratedAggregations] {
- protected var output: Row = _
+ protected var output: CRow = _
// state to hold the accumulators of the aggregations
private var accumulatorState: ValueState[Row] = _
// state to hold rows until the next watermark arrives
@@ -67,7 +68,7 @@ abstract class RowTimeUnboundedOver(
LOG.debug("Instantiating AggregateHelper.")
function = clazz.newInstance()
- output = function.createOutputRow()
+ output = new CRow(function.createOutputRow(), true)
sortedTimestamps = new util.LinkedList[Long]()
// initialize accumulator state
@@ -76,7 +77,8 @@ abstract class RowTimeUnboundedOver(
accumulatorState = getRuntimeContext.getState[Row](accDescriptor)
// initialize row state
- val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType)
+ val rowListTypeInfo: TypeInformation[JList[Row]] =
+ new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
@@ -87,15 +89,17 @@ abstract class RowTimeUnboundedOver(
* Puts an element from the input stream into state if it is not late.
* Registers a timer for the next watermark.
*
- * @param input The input value.
+ * @param inputC The input value.
* @param ctx The ctx to register timer or get current time
* @param out The collector for returning result values.
*
*/
override def processElement(
- input: Row,
- ctx: ProcessFunction[Row, Row]#Context,
- out: Collector[Row]): Unit = {
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val input = inputC.row
val timestamp = ctx.timestamp()
val curWatermark = ctx.timerService().currentWatermark()
@@ -126,11 +130,11 @@ abstract class RowTimeUnboundedOver(
*/
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[Row, Row]#OnTimerContext,
- out: Collector[Row]): Unit = {
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
- Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]])
- val collector = out.asInstanceOf[TimestampedCollector[Row]]
+ Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]])
+ val collector = out.asInstanceOf[TimestampedCollector[CRow]]
val keyIterator = rowMapState.keys.iterator
if (keyIterator.hasNext) {
@@ -206,7 +210,7 @@ abstract class RowTimeUnboundedOver(
def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
- out: Collector[Row]): Unit
+ out: Collector[CRow]): Unit
}
@@ -217,7 +221,7 @@ abstract class RowTimeUnboundedOver(
class RowTimeUnboundedRowsOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
- inputType: TypeInformation[Row])
+ inputType: TypeInformation[CRow])
extends RowTimeUnboundedOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType,
@@ -226,7 +230,7 @@ class RowTimeUnboundedRowsOver(
override def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
- out: Collector[Row]): Unit = {
+ out: Collector[CRow]): Unit = {
var i = 0
while (i < curRowList.size) {
@@ -234,11 +238,11 @@ class RowTimeUnboundedRowsOver(
var j = 0
// copy forwarded fields to output row
- function.setForwardedFields(curRow, output)
+ function.setForwardedFields(curRow, output.row)
// update accumulators and copy aggregates to output row
function.accumulate(lastAccumulator, curRow)
- function.setAggregationResults(lastAccumulator, output)
+ function.setAggregationResults(lastAccumulator, output.row)
// emit output row
out.collect(output)
i += 1
@@ -255,7 +259,7 @@ class RowTimeUnboundedRowsOver(
class RowTimeUnboundedRangeOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
- inputType: TypeInformation[Row])
+ inputType: TypeInformation[CRow])
extends RowTimeUnboundedOver(
genAggregations: GeneratedAggregationsFunction,
intermediateType,
@@ -264,7 +268,7 @@ class RowTimeUnboundedRangeOver(
override def processElementsWithSameTimestamp(
curRowList: JList[Row],
lastAccumulator: Row,
- out: Collector[Row]): Unit = {
+ out: Collector[CRow]): Unit = {
var i = 0
// all same timestamp data should have same aggregation value.
@@ -281,10 +285,10 @@ class RowTimeUnboundedRangeOver(
val curRow = curRowList.get(i)
// copy forwarded fields to output row
- function.setForwardedFields(curRow, output)
+ function.setForwardedFields(curRow, output.row)
//copy aggregates to output row
- function.setAggregationResults(lastAccumulator, output)
+ function.setAggregationResults(lastAccumulator, output.row)
out.collect(output)
i += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
index 9502607..0c8ae00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggregate
import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
@@ -26,29 +27,48 @@ import org.apache.flink.util.Collector
* Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped
* collector.
*/
-class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
- extends Collector[Row] {
+abstract class TimeWindowPropertyCollector[T](
+ windowStartOffset: Option[Int],
+ windowEndOffset: Option[Int])
+ extends Collector[T] {
- var wrappedCollector: Collector[Row] = _
+ var wrappedCollector: Collector[T] = _
+ var output: Row = _
var windowStart:Long = _
var windowEnd:Long = _
- override def collect(record: Row): Unit = {
+ def getRow(record: T): Row
- val lastFieldPos = record.getArity - 1
+ override def collect(record: T): Unit = {
+
+ output = getRow(record)
+ val lastFieldPos = output.getArity - 1
if (windowStartOffset.isDefined) {
- record.setField(
+ output.setField(
lastFieldPos + windowStartOffset.get,
SqlFunctions.internalToTimestamp(windowStart))
}
if (windowEndOffset.isDefined) {
- record.setField(
+ output.setField(
lastFieldPos + windowEndOffset.get,
SqlFunctions.internalToTimestamp(windowEnd))
}
+
wrappedCollector.collect(record)
}
override def close(): Unit = wrappedCollector.close()
}
+
+class RowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int])
+ extends TimeWindowPropertyCollector[Row](startOffset, endOffset) {
+
+ override def getRow(record: Row): Row = record
+}
+
+class CRowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int])
+ extends TimeWindowPropertyCollector[CRow](startOffset, endOffset) {
+
+ override def getRow(record: CRow): Row = record.row
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
new file mode 100644
index 0000000..ec73fa6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io
+
+import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.core.io.GenericInputSplit
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+class CRowValuesInputFormat(
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[CRow])
+ extends GenericInputFormat[CRow]
+ with NonParallelInput
+ with ResultTypeQueryable[CRow]
+ with Compiler[GenericInputFormat[Row]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var format: GenericInputFormat[Row] = _
+
+ override def open(split: GenericInputSplit): Unit = {
+ LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating GenericInputFormat.")
+ format = clazz.newInstance()
+ }
+
+ override def reachedEnd(): Boolean = format.reachedEnd()
+
+ override def nextRecord(reuse: CRow): CRow = {
+ reuse.row = format.nextRecord(reuse.row)
+ reuse.change = true
+ reuse
+ }
+
+ override def getProducedType: TypeInformation[CRow] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
index 1a339e6..d536b39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
@@ -23,20 +23,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.table.codegen.Compiler
import org.apache.flink.core.io.GenericInputSplit
+import org.apache.flink.types.Row
import org.slf4j.LoggerFactory
-class ValuesInputFormat[OUT](
+class ValuesInputFormat(
name: String,
code: String,
- @transient returnType: TypeInformation[OUT])
- extends GenericInputFormat[OUT]
+ @transient returnType: TypeInformation[Row])
+ extends GenericInputFormat[Row]
with NonParallelInput
- with ResultTypeQueryable[OUT]
- with Compiler[GenericInputFormat[OUT]] {
+ with ResultTypeQueryable[Row]
+ with Compiler[GenericInputFormat[Row]] {
val LOG = LoggerFactory.getLogger(this.getClass)
- private var format: GenericInputFormat[OUT] = _
+ private var format: GenericInputFormat[Row] = _
override def open(split: GenericInputSplit): Unit = {
LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
@@ -47,7 +48,7 @@ class ValuesInputFormat[OUT](
override def reachedEnd(): Boolean = format.reachedEnd()
- override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
+ override def nextRecord(reuse: Row): Row = format.nextRecord(reuse)
- override def getProducedType: TypeInformation[OUT] = returnType
+ override def getProducedType: TypeInformation[Row] = returnType
}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
new file mode 100644
index 0000000..25ec8c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.types.Row
+
+/**
+ * Wrapper for a [[Row]] to add retraction information.
+ *
+ * If [[change]] is true, the [[CRow]] is an accumulate message, if it is false it is a
+ * retraction message.
+ *
+ * @param row The wrapped [[Row]].
+ * @param change true for an accumulate message, false for a retraction message.
+ */
+class CRow(var row: Row, var change: Boolean) {
+
+ def this() {
+ this(null, true)
+ }
+
+ override def toString: String = s"${if(change) "+" else "-"}$row"
+
+ override def equals(other: scala.Any): Boolean = {
+ val otherCRow = other.asInstanceOf[CRow]
+ row.equals(otherCRow.row) && change == otherCRow.change
+ }
+}
+
+object CRow {
+
+ def apply(): CRow = {
+ new CRow()
+ }
+
+ def apply(row: Row, change: Boolean): CRow = {
+ new CRow(row, change)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
new file mode 100644
index 0000000..d848c65
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment}
+import org.apache.flink.types.Row
+
+class CRowComparator(val rowComp: TypeComparator[Row]) extends TypeComparator[CRow] {
+
+ override def hash(record: CRow): Int = rowComp.hash(record.row)
+
+ override def setReference(toCompare: CRow): Unit = rowComp.setReference(toCompare.row)
+
+ override def equalToReference(candidate: CRow): Boolean = rowComp.equalToReference(candidate.row)
+
+ override def compareToReference(otherComp: TypeComparator[CRow]): Int = {
+ val otherCRowComp = otherComp.asInstanceOf[CRowComparator]
+ rowComp.compareToReference(otherCRowComp.rowComp)
+ }
+
+ override def compare(first: CRow, second: CRow): Int = {
+ rowComp.compare(first.row, second.row)
+ }
+
+ override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
+ rowComp.compareSerialized(firstSource, secondSource)
+ }
+
+ override def supportsNormalizedKey(): Boolean = rowComp.supportsNormalizedKey()
+
+ override def supportsSerializationWithKeyNormalization(): Boolean =
+ rowComp.supportsSerializationWithKeyNormalization()
+
+ override def getNormalizeKeyLen: Int = rowComp.getNormalizeKeyLen
+
+ override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
+ rowComp.isNormalizedKeyPrefixOnly(keyBytes)
+
+ override def putNormalizedKey(
+ record: CRow,
+ target: MemorySegment,
+ offset: Int,
+ numBytes: Int): Unit = rowComp.putNormalizedKey(record.row, target, offset, numBytes)
+
+ override def writeWithKeyNormalization(record: CRow, target: DataOutputView): Unit = {
+ rowComp.writeWithKeyNormalization(record.row, target)
+ target.writeBoolean(record.change)
+ }
+
+ override def readWithKeyDenormalization(reuse: CRow, source: DataInputView): CRow = {
+ val row = rowComp.readWithKeyDenormalization(reuse.row, source)
+ reuse.row = row
+ reuse.change = source.readBoolean()
+ reuse
+ }
+
+ override def invertNormalizedKey(): Boolean = rowComp.invertNormalizedKey()
+
+ override def duplicate(): TypeComparator[CRow] = new CRowComparator(rowComp.duplicate())
+
+ override def extractKeys(record: scala.Any, target: Array[AnyRef], index: Int): Int =
+ rowComp.extractKeys(record.asInstanceOf[CRow].row, target, index)
+
+ override def getFlatComparators: Array[TypeComparator[_]] =
+ rowComp.getFlatComparators
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
new file mode 100644
index 0000000..1f56a98
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.types.Row
+
+class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSerializer[CRow] {
+
+ override def isImmutableType: Boolean = false
+
+ override def duplicate(): TypeSerializer[CRow] = new CRowSerializer(rowSerializer.duplicate())
+
+ override def createInstance(): CRow = new CRow(rowSerializer.createInstance(), true)
+
+ override def copy(from: CRow): CRow = new CRow(rowSerializer.copy(from.row), from.change)
+
+ override def copy(from: CRow, reuse: CRow): CRow = {
+ rowSerializer.copy(from.row, reuse.row)
+ reuse.change = from.change
+ reuse
+ }
+
+ override def getLength: Int = -1
+
+ override def serialize(record: CRow, target: DataOutputView): Unit = {
+ rowSerializer.serialize(record.row, target)
+ target.writeBoolean(record.change)
+ }
+
+ override def deserialize(source: DataInputView): CRow = {
+ val row = rowSerializer.deserialize(source)
+ val change = source.readBoolean()
+ new CRow(row, change)
+ }
+
+ override def deserialize(reuse: CRow, source: DataInputView): CRow = {
+ rowSerializer.deserialize(reuse.row, source)
+ reuse.change = source.readBoolean()
+ reuse
+ }
+
+ override def copy(source: DataInputView, target: DataOutputView): Unit = {
+ rowSerializer.copy(source, target)
+ target.writeBoolean(source.readBoolean())
+ }
+
+ override def canEqual(obj: Any): Boolean = obj.isInstanceOf[CRowSerializer]
+
+ override def equals(obj: Any): Boolean = {
+
+ if (canEqual(obj)) {
+ val other = obj.asInstanceOf[CRowSerializer]
+ rowSerializer.equals(other.rowSerializer)
+ } else {
+ false
+ }
+ }
+
+ override def hashCode: Int = rowSerializer.hashCode() * 13
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
new file mode 100644
index 0000000..456207a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+
+class CRowTypeInfo(val rowType: RowTypeInfo) extends CompositeType[CRow](classOf[CRow]) {
+
+ override def getFieldNames: Array[String] = rowType.getFieldNames
+
+ override def getFieldIndex(fieldName: String): Int = rowType.getFieldIndex(fieldName)
+
+ override def getTypeAt[X](fieldExpression: String): TypeInformation[X] =
+ rowType.getTypeAt(fieldExpression)
+
+ override def getTypeAt[X](pos: Int): TypeInformation[X] =
+ rowType.getTypeAt(pos)
+
+ override def getFlatFields(
+ fieldExpression: String,
+ offset: Int,
+ result: util.List[FlatFieldDescriptor]): Unit =
+ rowType.getFlatFields(fieldExpression, offset, result)
+
+ override def isBasicType: Boolean = rowType.isBasicType
+
+ override def isTupleType: Boolean = rowType.isTupleType
+
+ override def getArity: Int = rowType.getArity
+
+ override def getTotalFields: Int = rowType.getTotalFields
+
+ override def createSerializer(config: ExecutionConfig): TypeSerializer[CRow] =
+ new CRowSerializer(rowType.createSerializer(config))
+
+ // not implemented because we override createComparator
+ override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[CRow] = null
+
+ override def createComparator(
+ logicalKeyFields: Array[Int],
+ orders: Array[Boolean],
+ logicalFieldOffset: Int,
+ config: ExecutionConfig): TypeComparator[CRow] = {
+
+ val rowComparator = rowType.createComparator(
+ logicalKeyFields,
+ orders,
+ logicalFieldOffset,
+ config)
+
+ new CRowComparator(rowComparator)
+ }
+
+ override def equals(obj: scala.Any): Boolean = {
+ if (this.canEqual(obj)) {
+ rowType.equals(obj.asInstanceOf[CRowTypeInfo].rowType)
+ } else {
+ false
+ }
+ }
+
+ override def canEqual(obj: scala.Any): Boolean = obj.isInstanceOf[CRowTypeInfo]
+
+}
+
+object CRowTypeInfo {
+
+ def apply(rowType: TypeInformation[Row]): CRowTypeInfo = {
+ rowType match {
+ case r: RowTypeInfo => new CRowTypeInfo(r)
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index c37ee74..4a2fcdf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -25,6 +25,7 @@ import org.apache.flink.types.Row
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
/**
* A simple [[TableSink]] to emit data as CSV files.
@@ -133,3 +134,4 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
builder.mkString
}
}
+