You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by liurenjie1024 <gi...@git.apache.org> on 2018/03/12 07:49:45 UTC

[GitHub] flink pull request #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupSt...

GitHub user liurenjie1024 opened a pull request:

    https://github.com/apache/flink/pull/5680

    [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

    ## What is the purpose of the change
    
    *Add ProcessFunctionWithCleanupState's counterpart for KeyedProcessFunction.*
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liurenjie1024/flink KeyedProcessFunctionWithCleanupState

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5680.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5680
    
----
commit 26915427ba2f3c8e131cbd788c7e4967e69ae2c0
Author: liurenjie1024 <li...@...>
Date:   2018-03-12T07:43:26Z

    KeyedProcessFunctionWithCleanupState

----


---

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    Can anyone help to merge this?


---

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    shall we add a unit test?


---

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    Oh, I see. 
    Thanks for the explanation @liurenjie1024. In the future, it would be good to add such pointers to the description of the PR.



---

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    The purpose of unit tests is not only to validate that the new feature works as expected but also to ensure that the functionality is not broken by future changes.
    So even if the code is copied, we should add a test.


---

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    @bowenli86 @fhueske UT added.
    
    This class is a utility class which can be the base class of many process function implementations in flink table. Its counterpart for the legacy `ProcessFunction `interface, `ProcessFunctionWithCleanupState`, has been inherited by many implementations. In fact, my other [PR](https://github.com/apache/flink/pull/5688) depends on this.


---

[GitHub] flink issue #5680: [FLINK-8919] [Table API & SQL] Add KeyedProcessFunctionWi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    Merging. 
    Thanks for the PR @liurenjie1024 and helping with the review @bowenli86!


---

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    @bowenli86 This is a trivial change and most the code is copied from the non keyed counterpart, so I don't think we need a test.


---

[GitHub] flink pull request #5680: [FLINK-8919] [Table API & SQL] Add KeyedProcessFun...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5680


---

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    Can anyone help to merge this?


---

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5680
  
    Taking a step back. What's the purpose of this change? It adds a class to the `flink-table` module that is not used anywhere and not part of the public API of the Table API or SQL.
    Why do we need it?


---

[GitHub] flink pull request #5680: [FLINK-8919] [Table API & SQL] Add KeyedProcessFun...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5680#discussion_r176499309
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.operators
    +
    +import java.lang.{Boolean => JBool}
    +import scala.collection.JavaConversions._
    +import org.apache.flink.api.common.time.Time
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    +import org.apache.flink.streaming.api.operators.KeyedProcessOperator
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
    +import org.apache.flink.table.runtime.harness.HarnessTestBase
    +import org.apache.flink.util.Collector
    +import org.junit.Test
    +import org.junit.Assert.assertArrayEquals
    +
    +class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase {
    +  @Test
    +  def testNeedToCleanup(): Unit = {
    +    val queryConfig = new StreamQueryConfig()
    --- End diff --
    
    It would be good if the test would check that the state is actually cleared.


---