You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dianfu <gi...@git.apache.org> on 2017/06/23 09:50:12 UTC

[GitHub] flink pull request #4172: [FLINK-6983] [cep] Do not serialize States with NF...

GitHub user dianfu opened a pull request:

    https://github.com/apache/flink/pull/4172

    [FLINK-6983] [cep] Do not serialize States with NFA

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dianfu/flink OptimizeNFASerializer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4172.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4172
    
----
commit a56a5429ceea55a07b9c5520d044e0582e352f0e
Author: Dian Fu <fu...@alibaba-inc.com>
Date:   2017-06-23T09:46:35Z

    [FLINK-6983] [cep] Do not serialize States with NFA

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @dianfu So this is a plan that can work to avoid having to generate the code of the `IterativeCondition` every time. This came also after discussion with @fhueske who also explained me how things are done in the SQL part. So here we go:
    
    1) You will start with a String which contains the user's query. 
    2) You analyze the query and generate the code that corresponds to the `IterativeCondition`.
        2i) The code goes into a `Wrapper` class, which extends the `IterativeCondition` and keeps only 2 `String`s: the code of the condition and a unique name for the condition.
        2ii) This `Wrapper`s is passed to the `Pattern` and the `CEPOperator`.
    **THE JOB IS SUBMITTED**
    3) At the `open()`, the `CEPOperator` checks the states if they have actual `IterativeConditions` or `Wrapper` (which is a subclass of the `IterativeCondition`), and if yes:
        3i) it gets the name and the code and compiles the code to an actual `IterativeCondition`, which is then stored instead of the `Wrapper`.
    
    This way we have to compile only once, at the `open()` of the operator, and not every time.
    What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    Yes @twalthr , you are right, the cause is different classloader. The compiled class is under Janino's custom ClassLoader. Using the current thread's classloader or user code classloader can't load the class.
    
    BUT, is it possible to pass the Janino's classloader into state deserialization ?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    Hi @dianfu , this branch seems to be broken. Most of the migration tests fail when you run them locally. I will keep on checking out the branch, just to see the general idea of the change because this may give some ideas on how to handle the `IterativeCondition`s in this PR https://github.com/apache/flink/pull/4145 .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @kl0u Thanks for the comments. I think that makes sense. We can revisit this optimization after adding `Pattern` at runtime is supported :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @kl0u ,  do you mean compile code to `IterativeCondition` and set it to `StateTransition.newCondition` field instead of the original wrapper? And it will be serialized during snapshot and deserialized in the next time we get the NFA. But I'm afraid it will throw ClassNotFound exception when deserializing.
    
    You can try this code snippet: https://gist.github.com/wuchong/4d6fc04c131ebcb544665f6518ac0a5e
    Maybe I missed something. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @wuchong Yes this is what I had in mind. Could you tell me which class is not found?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    Hi @dianfu . Thanks for updating the PR.
    
    I left some comments on the related JIRA (https://issues.apache.org/jira/browse/FLINK-6983) and the related https://issues.apache.org/jira/browse/FLINK-6939. I suggested to do these optimizations at the end, when all the features are in and focus for now in the CEP/SQL integration.
    
    Please let me know what you think!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @dianfu and I also include @wuchong on this as these two are related. 
    
    The way I see it is that by not serializing the condition and the states, you are trying to gain some speed, especially when using RocksDB where you serialize/deserialize on every element, right? My suggestion is to not do these optimizations yet.
    
    At first, because this seems like a pre-mature optimization to me as we are not sure yet about the interplay between all the features we are planning to put in `CEP`, and we know that if we allow users to add `Patterns` at runtime, then we will need 1) to store both States and Conditions and 2) match the States and Conditions of a given NFA with its SharredBuffer. In other words, we will need a unique Id for each NFA, that will match the restored sharedBuffer (which is still serialized and deserialized as before) with the States (`metastates` in this PR) and Conditions (`ConditionRegistry` in https://github.com/apache/flink/pull/4145) of the NFA. 
    
    So I propose to implement this https://issues.apache.org/jira/browse/FLINK-7008 and https://issues.apache.org/jira/browse/FLINK-6938 right away so that we can proceed with the SQL integration, and think a more general solution for checkpointing separately the static state of the NFA (state and conditions) from the dynamic one (sharedbuffer), which will lead to runtime gains.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @wuchong this sounds like a classloader issues. Where does `SerializationUtils` come from? Maybe it is because of the wrong class loader. In `CRowProcessRunner` we are using the user code classloader.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @kl0u I mean the compiled class (i.e. the class of the code generated `IterativeCondition`) can't be found while deserializing.  Maybe it's a problem of Janino compiler. cc @fhueske do you know why ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @kl0u  Thanks for your suggestions. I think the optimizations in this PR are straight forward. IMO, the changes which may have something to do with other features ongoing are all in `AbstractKeyedCEPPatternOperator` and the changes to that file are so not much and should not be a problem. Since the PR is already available, it will not require much efforts to do these optimizations. What's your thought?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    @kl0u  I'm fine with this.  I want to make sure that in order to support FLINK-6938, we have to call `open()` on every condition after every time we call `getNFA()`, right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4172: [FLINK-6983] [cep] Do not serialize States with NFA

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4172
  
    Hi @wuchong . I am not an expert in Janino and how it works but I do not think you need Janino's classloader at any point. Using the `open()` of the `CEPOperator` you just need to compile the code of the `IterativeCondition` whenever we have a `IterativeConditionWrapper`. In the code snippet you sent before, line 30 should change to take the user classloader from the runtime context. If I remember correctly it is `getRuntimeContext().getUserClassLoader()`. 
    
    This will allow Janino to do its magic using the user classloader. After compiling the class, you can set the `newCondition` field of the `StateTransitions` to an object of the compiled class. 
    
    After this, whenever you have a checkpoint or whenever you serialize/deserialize the condition, given that this will be a regular IterativeCondition, Flink will know how to do it.
    
    I am also including @twalthr and @fhueske on this to comment, as they have more experience with this. In addition, if you have a prototype that does all this but fails at some step with an error, you can send a link and we can try to check it out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---