You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/29 22:18:08 UTC

flink git commit: [FLINK-7025] [table] Port non-partitioned unbounded proctime Over window to keyed state.

Repository: flink
Updated Branches:
  refs/heads/release-1.3 001df1979 -> db7f0ffd4


[FLINK-7025] [table] Port non-partitioned unbounded proctime Over window to keyed state.

This closes #4212.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db7f0ffd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db7f0ffd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db7f0ffd

Branch: refs/heads/release-1.3
Commit: db7f0ffd4f882cdcb294c470a15665752b765816
Parents: 001df19
Author: sunjincheng121 <su...@gmail.com>
Authored: Wed Jun 28 23:19:56 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 30 00:00:15 2017 +0200

----------------------------------------------------------------------
 .../datastream/DataStreamOverAggregate.scala    |  15 +--
 .../table/runtime/aggregate/AggregateUtil.scala |  15 +--
 .../ProcTimeUnboundedNonPartitionedOver.scala   | 115 -------------------
 .../aggregate/ProcTimeUnboundedOver.scala       | 102 ++++++++++++++++
 .../ProcTimeUnboundedPartitionedOver.scala      | 102 ----------------
 .../api/scala/stream/sql/OverWindowITCase.scala |   8 +-
 .../runtime/harness/OverWindowHarnessTest.scala |   4 +-
 7 files changed, 118 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db7f0ffd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 08f0356..c03dac6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -242,17 +242,10 @@ class DataStreamOverAggregate(
       }
       // non-partitioned aggregation
       else {
-        if (isRowTimeType) {
-          inputDS.keyBy(new NullByteKeySelector[CRow])
-            .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(returnTypeInfo)
-            .name(aggOpName)
-        } else {
-          inputDS
-            .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(returnTypeInfo)
-            .name(aggOpName)
-        }
+        inputDS.keyBy(new NullByteKeySelector[CRow])
+          .process(processFunction).setParallelism(1).setMaxParallelism(1)
+          .returns(returnTypeInfo)
+          .name(aggOpName)
       }
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/db7f0ffd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 2907b99..95991d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -129,17 +129,10 @@ object AggregateUtil {
           queryConfig)
       }
     } else {
-      if (isPartitioned) {
-        new ProcTimeUnboundedPartitionedOver(
-          genFunction,
-          aggregationStateType,
-          queryConfig)
-      } else {
-        new ProcTimeUnboundedNonPartitionedOver(
-          genFunction,
-          aggregationStateType,
-          queryConfig)
-      }
+      new ProcTimeUnboundedOver(
+        genFunction,
+        aggregationStateType,
+        queryConfig)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db7f0ffd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
deleted file mode 100644
index f86bed2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.util.Collector
-import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-
-/**
-  * Process Function for non-partitioned processing-time unbounded OVER window
-  *
-  * @param genAggregations Generated aggregate helper function
-  * @param aggregationStateType     row type info of aggregation
-  */
-class ProcTimeUnboundedNonPartitionedOver(
-    genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo,
-    queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with CheckpointedFunction
-    with Compiler[GeneratedAggregations] {
-
-  private var accumulators: Row = _
-  private var output: CRow = _
-  private var state: ListState[Row] = _
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
-                s"Code:\n${genAggregations.code}")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = new CRow(function.createOutputRow(), true)
-    if (null == accumulators) {
-      val it = state.get().iterator()
-      if (it.hasNext) {
-        accumulators = it.next()
-      } else {
-        accumulators = function.createAccumulators()
-      }
-    }
-    initCleanupTimeState("ProcTimeUnboundedNonPartitionedOverCleanupTime")
-  }
-
-  override def processElement(
-      inputC: CRow,
-      ctx: ProcessFunction[CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
-    // register state-cleanup timer
-    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
-
-
-    val input = inputC.row
-
-    function.setForwardedFields(input, output.row)
-
-    function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output.row)
-
-    out.collect(output)
-  }
-
-  override def onTimer(
-    timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
-    out: Collector[CRow]): Unit = {
-
-    if (needToCleanupState(timestamp)) {
-      cleanupState(state)
-    }
-  }
-
-  override def snapshotState(context: FunctionSnapshotContext): Unit = {
-    state.clear()
-    if (null != accumulators) {
-      state.add(accumulators)
-    }
-  }
-
-  override def initializeState(context: FunctionInitializationContext): Unit = {
-    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType)
-    state = context.getOperatorStateStore.getListState(accumulatorsDescriptor)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db7f0ffd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
new file mode 100644
index 0000000..7a7b44d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
+import org.slf4j.LoggerFactory
+
+/**
+  * Process Function for processing-time unbounded OVER window
+  *
+  * @param genAggregations Generated aggregate helper function
+  * @param aggregationStateType     row type info of aggregation
+  */
+class ProcTimeUnboundedOver(
+    genAggregations: GeneratedAggregationsFunction,
+    aggregationStateType: RowTypeInfo,
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+    with Compiler[GeneratedAggregations] {
+
+  private var output: CRow = _
+  private var state: ValueState[Row] = _
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
+                s"Code:\n${genAggregations.code}")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+
+    output = new CRow(function.createOutputRow(), true)
+    val stateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("overState", aggregationStateType)
+    state = getRuntimeContext.getState(stateDescriptor)
+
+    initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
+  }
+
+  override def processElement(
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
+    val input = inputC.row
+
+    var accumulators = state.value()
+
+    if (null == accumulators) {
+      accumulators = function.createAccumulators()
+    }
+
+    function.setForwardedFields(input, output.row)
+
+    function.accumulate(accumulators, input)
+    function.setAggregationResults(accumulators, output.row)
+
+    state.update(accumulators)
+    out.collect(output)
+  }
+
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
+
+    if (needToCleanupState(timestamp)) {
+      cleanupState(state)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db7f0ffd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
deleted file mode 100644
index ad43d94..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.ValueStateDescriptor
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.apache.flink.table.runtime.types.CRow
-import org.slf4j.LoggerFactory
-
-/**
-  * Process Function for processing-time unbounded OVER window
-  *
-  * @param genAggregations Generated aggregate helper function
-  * @param aggregationStateType     row type info of aggregation
-  */
-class ProcTimeUnboundedPartitionedOver(
-    genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo,
-    queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
-
-  private var output: CRow = _
-  private var state: ValueState[Row] = _
-  val LOG = LoggerFactory.getLogger(this.getClass)
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
-                s"Code:\n${genAggregations.code}")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = new CRow(function.createOutputRow(), true)
-    val stateDescriptor: ValueStateDescriptor[Row] =
-      new ValueStateDescriptor[Row]("overState", aggregationStateType)
-    state = getRuntimeContext.getState(stateDescriptor)
-
-    initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
-  }
-
-  override def processElement(
-    inputC: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
-    out: Collector[CRow]): Unit = {
-
-    // register state-cleanup timer
-    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
-
-    val input = inputC.row
-
-    var accumulators = state.value()
-
-    if (null == accumulators) {
-      accumulators = function.createAccumulators()
-    }
-
-    function.setForwardedFields(input, output.row)
-
-    function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output.row)
-
-    state.update(accumulators)
-    out.collect(output)
-  }
-
-  override def onTimer(
-    timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
-    out: Collector[CRow]): Unit = {
-
-    if (needToCleanupState(timestamp)) {
-      cleanupState(state)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db7f0ffd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 64b7132..8abe561 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.api.scala.stream.sql
 
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, TableException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
@@ -216,6 +217,9 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
   @Test
   def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+    val queryConfig =
+      new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
+
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -234,7 +238,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db7f0ffd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 786a843..8cad64f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.runtime.harness
 
 import java.lang.{Integer => JInt, Long => JLong}
-import java.util.concurrent.{ConcurrentLinkedQueue}
+import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -276,7 +276,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   def testProcTimeUnboundedOver(): Unit = {
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
-      new ProcTimeUnboundedPartitionedOver(
+      new ProcTimeUnboundedOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         queryConfig))