You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2015/06/10 08:53:31 UTC

[GitHub] flink pull request: [FLINK-2191] Fix inconsistent use of closure c...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/813

    [FLINK-2191] Fix inconsistent use of closure cleaner in Scala Streaming

    The closure cleaner still cannot be disabled for the Timestamp extractor
    in Time and for the delta function in Delta (windowing helpers).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink closure-cleaner-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/813.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #813
    
----
commit 07ad8dfb0d766a40fb27611f9c8c9a155c48c6c8
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-06-09T14:17:48Z

    [FLINK-2191] Fix inconsistent use of closure cleaner in Scala Streaming
    
    The closure cleaner still cannot be disabled for the Timestamp extractor
    in Time and for the delta function in Delta (windowing helpers).

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2191] Fix inconsistent use of closure c...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/813#discussion_r32102485
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---
    @@ -455,15 +455,22 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
        */
       def getStreamGraph = javaEnv.getStreamGraph
     
    +  /**
    +   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
    +   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
    +   */
    +  private[flink] def clean[F <: AnyRef](f: F): F = {
    +    if (getConfig.isClosureCleanerEnabled) {
    +      ClosureCleaner.clean(f, true)
    +    } else {
    +      ClosureCleaner.ensureSerializable(f)
    +    }
    +    f
    +  }
    --- End diff --
    
    I would suggest to have just this clean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2191] Fix inconsistent use of closure c...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/813#issuecomment-110739702
  
    Looks good to me :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2191] Fix inconsistent use of closure c...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/813#issuecomment-110786608
  
    Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2191] Fix inconsistent use of closure c...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/813#discussion_r32102450
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---
    @@ -781,11 +780,24 @@ class DataStream[T](javaStream: JavaStream[T]) {
         if (fun == null) {
           throw new NullPointerException("Sink function must not be null.")
         }
    +    val cleanFun = clean(fun)
         val sinkFunction = new SinkFunction[T] {
    -      val cleanFun = clean(fun)
           def invoke(in: T) = cleanFun(in)
         }
         this.addSink(sinkFunction)
       }
     
    +  /**
    +   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
    +   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
    +   */
    +  private[flink] def clean[F <: AnyRef](f: F): F = {
    --- End diff --
    
    Why do we need multiple clean functions? I think scala visibility solves that. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2191] Fix inconsistent use of closure c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/813#discussion_r32103307
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---
    @@ -455,15 +455,22 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
        */
       def getStreamGraph = javaEnv.getStreamGraph
     
    +  /**
    +   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
    +   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
    +   */
    +  private[flink] def clean[F <: AnyRef](f: F): F = {
    +    if (getConfig.isClosureCleanerEnabled) {
    +      ClosureCleaner.clean(f, true)
    +    } else {
    +      ClosureCleaner.ensureSerializable(f)
    +    }
    +    f
    +  }
    --- End diff --
    
    I will look into it. The problem is that the DataStream and all the other classes don't have a reference to the Scala StreamExecutionEnvironment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2191] Fix inconsistent use of closure c...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/813


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2191] Fix inconsistent use of closure c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/813#issuecomment-110733849
  
    I updated it, now we still have several "clean" functions but they just forward to the StreamingExEnv.scalaClean() function. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---