You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by xccui <gi...@git.apache.org> on 2017/08/11 17:30:32 UTC

[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

GitHub user xccui opened a pull request:

    https://github.com/apache/flink/pull/4530

    [FLINK-7245] [stream] Support holding back watermarks with static delays

    ## What is the purpose of the change
    
    *This pull request aims to allow the operators to support holding back watermarks with **static** delays.*
    
    
    ## Brief change log
    
      - *Introduce a new method `getWatermarkToEmit(Watermark inputWatermark)`, which allows to generate a new watermark with different timestamp before emitting it.*
      - *Add two operators `KeyedProcessOperatorWithWatermarkDelay` and `KeyedCoProcessOperatorWithWatermarkDelay` that support holding back watermarks with static delays.*
    
    ## Verifying this change
    
    This change  is verified by two new test classes `KeyedProcessOperatorWithWatermarkDelayTest` and `KeyedCoProcessOperatorWithWatermarkDelayTest`. They test whether watermarks received by the two added operators can be held back with the given delays and the provided delays are non-negative.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (**no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**)
      - The serializers: (**no**)
      - The runtime per-record code paths (performance sensitive): (**no**)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes**)
      - If yes, how is the feature documented? (**JavaDocs**)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xccui/flink FLINK-7245

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4530.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4530
    
----
commit a24a11522af54c547d014d30adbefa23997d0f8d
Author: Xingcan Cui <xi...@gmail.com>
Date:   2017-08-09T12:54:16Z

    [FLINK-7245] [stream] Support holding back watermarks with static delays

commit f730ab45c88f8bcbc27e411901e27dee84aa26b2
Author: Xingcan Cui <xi...@gmail.com>
Date:   2017-08-11T16:15:53Z

    Refine codes

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    I think the new operators can go into the Table API packages. They are not usable from the public `DataStream API`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    I see, didn't consider the private timeServiceManager. Maybe this is the best approach then. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    I think the approach is OK, though I personally would have preferred to make the instance variable protected and override `processWatermark()` to minimise impact on `AbstractStreamOperator`, which now has an extra method call. (though that will probably be inlined)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` protected indeed will minimise the impact on `AbstractStreamOperator`, while that may introduce duplicated codes in the subclasses. We make some trade-offs here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4530#discussion_r133842744
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.operators
    +
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
    +import org.apache.flink.streaming.api.watermark.Watermark
    +
    +/**
    +  * A {@link org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator} that supports
    +  * holding back watermarks with a static delays.
    +  */
    +class KeyedCoProcessOperatorWithWatermarkDelay[KEY, IN1, IN2, OUT](
    +  private val flatMapper: CoProcessFunction[IN1, IN2, OUT],
    +  private val watermarkDelay1: Long = 0L,
    +  // The watermarkDelay2 is useless now
    +  private var watermarkDelay2: Long = 0L)
    --- End diff --
    
    Since this is an internal class, we don't need to be concerned about changing the interface later. I'd suggest to remove `watermarkDelay2` and add it later when we need it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    Thanks for the comments @fhueske. I will pay more attention to the coding style. 
    
    Actually, there are many ways to implement this feature. At first, I planed to override the `processWatermark` method in the sub-class. However, the instance variable `timeServiceManager` needed is declared as private. I'm not sure if this can be changed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    I'm fine with the changes. +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4530#discussion_r133843854
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.operators
    +
    +import java.util.concurrent.ConcurrentLinkedQueue
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
    +import org.apache.flink.util.{Collector, TestLogger}
    +
    +import org.junit.{Assert, Test}
    +
    +/**
    +  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
    +  */
    +class KeyedProcessOperatorWithWatermarkDelayTest extends TestLogger {
    --- End diff --
    
    comments of the other test apply here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    Good proposal @xccui! I'd prefer the plural: `org.apache.flink.table.runtime.operators`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4530


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4530#discussion_r133843436
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.operators
    +
    +import java.util.concurrent.ConcurrentLinkedQueue
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TestHarnessUtil}
    +import org.apache.flink.util.{Collector, TestLogger}
    +
    +import org.junit.{Assert, Test}
    +
    +/**
    +  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
    +  */
    +class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {
    +
    +  @Test
    +  def testHoldingBackWatermarks(): Unit = {
    +    val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, Integer, String, String](
    +      new EmptyCoProcessFunction, 100)
    +    val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, Integer, String, String](
    +      operator, new IntToStringKeySelector, new CoIdentityKeySelector[String],
    +      BasicTypeInfo.STRING_TYPE_INFO)
    +    testHarness.setup()
    +    testHarness.open()
    +    testHarness.processWatermark1(new Watermark(101))
    +    testHarness.processWatermark2(new Watermark(202))
    +    testHarness.processWatermark1(new Watermark(103))
    +    testHarness.processWatermark2(new Watermark(204))
    +    val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
    +    expectedOutput.add(new Watermark(1))
    +    expectedOutput.add(new Watermark(3))
    +    TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput,
    --- End diff --
    
    Please format parameter lists that do not fit into one line as follows:
    
    ```
    TestHarnessUtil.assertOutputEquals(
      "Output was not correct.", 
      expectedOutput, 
      testHarness.getOutput)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    @fhueske Yes, the plural is better. I should have noticed that before. 
    This PR is updated with the new package name and an extra delay parameter added to the co-operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    I totally understand the choice, @fhueske 😄 
    Thanks for the refactoring. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    Thanks for the suggestion, @aljoscha. Do you think it's appropriate to add a new package `org.apache.flink.table.runtime.operator`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4530
  
    You're right @xccui, this is a trade off. 
    
    I thought about this again and agree with @aljoscha that it would be better to avoid the additional method call. The `processWatermark()` method is called many times in every DataStream program and the duplicated code are less than 10 lines of code in 4 classes.
    
    I will do the refactoring and merge this PR. 
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4530#discussion_r133843667
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.operators
    +
    +import java.util.concurrent.ConcurrentLinkedQueue
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.streaming.api.functions.co.CoProcessFunction
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TestHarnessUtil}
    +import org.apache.flink.util.{Collector, TestLogger}
    +
    +import org.junit.{Assert, Test}
    +
    +/**
    +  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
    +  */
    +class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {
    +
    +  @Test
    +  def testHoldingBackWatermarks(): Unit = {
    +    val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, Integer, String, String](
    +      new EmptyCoProcessFunction, 100)
    +    val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, Integer, String, String](
    +      operator, new IntToStringKeySelector, new CoIdentityKeySelector[String],
    +      BasicTypeInfo.STRING_TYPE_INFO)
    +    testHarness.setup()
    +    testHarness.open()
    +    testHarness.processWatermark1(new Watermark(101))
    +    testHarness.processWatermark2(new Watermark(202))
    +    testHarness.processWatermark1(new Watermark(103))
    +    testHarness.processWatermark2(new Watermark(204))
    +    val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
    +    expectedOutput.add(new Watermark(1))
    +    expectedOutput.add(new Watermark(3))
    +    TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput,
    +      testHarness.getOutput)
    +    testHarness.close()
    +  }
    +
    +  @Test
    --- End diff --
    
    use `@Test(expected = classOf[IllegalArgumentException])` instead of try-catch with `isInstanceOf`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---