You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/29 21:58:23 UTC
[3/3] flink git commit: [FLINK-7025] [table] Port non-partitioned
unbounded proctime Over window to keyed state.
[FLINK-7025] [table] Port non-partitioned unbounded proctime Over window to keyed state.
This closes #4212.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55ab34ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55ab34ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55ab34ff
Branch: refs/heads/master
Commit: 55ab34ff3b92b2567bcab80f39826a9667f1c9ce
Parents: 1cf1620
Author: sunjincheng121 <su...@gmail.com>
Authored: Wed Jun 28 23:19:56 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jun 29 23:58:18 2017 +0200
----------------------------------------------------------------------
.../datastream/DataStreamOverAggregate.scala | 15 +--
.../table/runtime/aggregate/AggregateUtil.scala | 15 +--
.../ProcTimeUnboundedNonPartitionedOver.scala | 115 -------------------
.../aggregate/ProcTimeUnboundedOver.scala | 102 ++++++++++++++++
.../ProcTimeUnboundedPartitionedOver.scala | 102 ----------------
.../api/scala/stream/sql/OverWindowITCase.scala | 8 +-
.../runtime/harness/OverWindowHarnessTest.scala | 4 +-
7 files changed, 118 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 08f0356..c03dac6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -242,17 +242,10 @@ class DataStreamOverAggregate(
}
// non-partitioned aggregation
else {
- if (isRowTimeType) {
- inputDS.keyBy(new NullByteKeySelector[CRow])
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(returnTypeInfo)
- .name(aggOpName)
- } else {
- inputDS
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(returnTypeInfo)
- .name(aggOpName)
- }
+ inputDS.keyBy(new NullByteKeySelector[CRow])
+ .process(processFunction).setParallelism(1).setMaxParallelism(1)
+ .returns(returnTypeInfo)
+ .name(aggOpName)
}
result
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 04c5070..f4ead48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -129,17 +129,10 @@ object AggregateUtil {
queryConfig)
}
} else {
- if (isPartitioned) {
- new ProcTimeUnboundedPartitionedOver(
- genFunction,
- aggregationStateType,
- queryConfig)
- } else {
- new ProcTimeUnboundedNonPartitionedOver(
- genFunction,
- aggregationStateType,
- queryConfig)
- }
+ new ProcTimeUnboundedOver(
+ genFunction,
+ aggregationStateType,
+ queryConfig)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
deleted file mode 100644
index f86bed2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.util.Collector
-import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-
-/**
- * Process Function for non-partitioned processing-time unbounded OVER window
- *
- * @param genAggregations Generated aggregate helper function
- * @param aggregationStateType row type info of aggregation
- */
-class ProcTimeUnboundedNonPartitionedOver(
- genAggregations: GeneratedAggregationsFunction,
- aggregationStateType: RowTypeInfo,
- queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
- with CheckpointedFunction
- with Compiler[GeneratedAggregations] {
-
- private var accumulators: Row = _
- private var output: CRow = _
- private var state: ListState[Row] = _
- val LOG = LoggerFactory.getLogger(this.getClass)
-
- private var function: GeneratedAggregations = _
-
- override def open(config: Configuration) {
- LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
- s"Code:\n${genAggregations.code}")
- val clazz = compile(
- getRuntimeContext.getUserCodeClassLoader,
- genAggregations.name,
- genAggregations.code)
- LOG.debug("Instantiating AggregateHelper.")
- function = clazz.newInstance()
-
- output = new CRow(function.createOutputRow(), true)
- if (null == accumulators) {
- val it = state.get().iterator()
- if (it.hasNext) {
- accumulators = it.next()
- } else {
- accumulators = function.createAccumulators()
- }
- }
- initCleanupTimeState("ProcTimeUnboundedNonPartitionedOverCleanupTime")
- }
-
- override def processElement(
- inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- // register state-cleanup timer
- registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
-
-
- val input = inputC.row
-
- function.setForwardedFields(input, output.row)
-
- function.accumulate(accumulators, input)
- function.setAggregationResults(accumulators, output.row)
-
- out.collect(output)
- }
-
- override def onTimer(
- timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
- out: Collector[CRow]): Unit = {
-
- if (needToCleanupState(timestamp)) {
- cleanupState(state)
- }
- }
-
- override def snapshotState(context: FunctionSnapshotContext): Unit = {
- state.clear()
- if (null != accumulators) {
- state.add(accumulators)
- }
- }
-
- override def initializeState(context: FunctionInitializationContext): Unit = {
- val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType)
- state = context.getOperatorStateStore.getListState(accumulatorsDescriptor)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
new file mode 100644
index 0000000..7a7b44d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
+import org.slf4j.LoggerFactory
+
+/**
+ * Process Function for processing-time unbounded OVER window
+ *
+ * @param genAggregations Generated aggregate helper function
+ * @param aggregationStateType row type info of aggregation
+ */
+class ProcTimeUnboundedOver(
+ genAggregations: GeneratedAggregationsFunction,
+ aggregationStateType: RowTypeInfo,
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ with Compiler[GeneratedAggregations] {
+
+ private var output: CRow = _
+ private var state: ValueState[Row] = _
+ val LOG = LoggerFactory.getLogger(this.getClass)
+ private var function: GeneratedAggregations = _
+
+ override def open(config: Configuration) {
+ LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
+ s"Code:\n${genAggregations.code}")
+ val clazz = compile(
+ getRuntimeContext.getUserCodeClassLoader,
+ genAggregations.name,
+ genAggregations.code)
+ LOG.debug("Instantiating AggregateHelper.")
+ function = clazz.newInstance()
+
+ output = new CRow(function.createOutputRow(), true)
+ val stateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("overState", aggregationStateType)
+ state = getRuntimeContext.getState(stateDescriptor)
+
+ initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
+ }
+
+ override def processElement(
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
+ val input = inputC.row
+
+ var accumulators = state.value()
+
+ if (null == accumulators) {
+ accumulators = function.createAccumulators()
+ }
+
+ function.setForwardedFields(input, output.row)
+
+ function.accumulate(accumulators, input)
+ function.setAggregationResults(accumulators, output.row)
+
+ state.update(accumulators)
+ out.collect(output)
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ if (needToCleanupState(timestamp)) {
+ cleanupState(state)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
deleted file mode 100644
index ad43d94..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.ValueStateDescriptor
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.apache.flink.table.runtime.types.CRow
-import org.slf4j.LoggerFactory
-
-/**
- * Process Function for processing-time unbounded OVER window
- *
- * @param genAggregations Generated aggregate helper function
- * @param aggregationStateType row type info of aggregation
- */
-class ProcTimeUnboundedPartitionedOver(
- genAggregations: GeneratedAggregationsFunction,
- aggregationStateType: RowTypeInfo,
- queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
- with Compiler[GeneratedAggregations] {
-
- private var output: CRow = _
- private var state: ValueState[Row] = _
- val LOG = LoggerFactory.getLogger(this.getClass)
- private var function: GeneratedAggregations = _
-
- override def open(config: Configuration) {
- LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
- s"Code:\n${genAggregations.code}")
- val clazz = compile(
- getRuntimeContext.getUserCodeClassLoader,
- genAggregations.name,
- genAggregations.code)
- LOG.debug("Instantiating AggregateHelper.")
- function = clazz.newInstance()
-
- output = new CRow(function.createOutputRow(), true)
- val stateDescriptor: ValueStateDescriptor[Row] =
- new ValueStateDescriptor[Row]("overState", aggregationStateType)
- state = getRuntimeContext.getState(stateDescriptor)
-
- initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
- }
-
- override def processElement(
- inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
-
- // register state-cleanup timer
- registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
-
- val input = inputC.row
-
- var accumulators = state.value()
-
- if (null == accumulators) {
- accumulators = function.createAccumulators()
- }
-
- function.setForwardedFields(input, output.row)
-
- function.accumulate(accumulators, input)
- function.setAggregationResults(accumulators, output.row)
-
- state.update(accumulators)
- out.collect(output)
- }
-
- override def onTimer(
- timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
- out: Collector[CRow]): Unit = {
-
- if (needToCleanupState(timestamp)) {
- cleanupState(state)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 7a24f50..397e72c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -18,12 +18,13 @@
package org.apache.flink.table.api.scala.stream.sql
+import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, TableException}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
@@ -217,6 +218,9 @@ class OverWindowITCase extends StreamingWithStateTestBase {
@Test
def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+ val queryConfig =
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
+
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -235,7 +239,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 786a843..8cad64f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.runtime.harness
import java.lang.{Integer => JInt, Long => JLong}
-import java.util.concurrent.{ConcurrentLinkedQueue}
+import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -276,7 +276,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
def testProcTimeUnboundedOver(): Unit = {
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
- new ProcTimeUnboundedPartitionedOver(
+ new ProcTimeUnboundedOver(
genMinMaxAggFunction,
minMaxAggregationStateType,
queryConfig))