You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by markap14 <gi...@git.apache.org> on 2016/10/07 15:32:50 UTC

[GitHub] nifi pull request #1115: NIFI-2850: Added a migrate() method to ProcessSessi...

GitHub user markap14 opened a pull request:

    https://github.com/apache/nifi/pull/1115

    NIFI-2850: Added a migrate() method to ProcessSession and refactored \u2026

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
    
    \u2026BinFiles and MergeContent to use it

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/markap14/nifi NIFI-2850

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1115.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1115
    
----
commit d939a3d64fcc97a80322f292d13e3d61485a1b3e
Author: Mark Payne <ma...@hotmail.com>
Date:   2016-09-29T18:41:35Z

    NIFI-2850: Added a migrate() method to ProcessSession and refactored BinFiles and MergeContent to use it

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1115: NIFI-2850: Added a migrate() method to ProcessSessi...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1115#discussion_r86136287
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---
    @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto
                 }
     
                 final ProcessSession session = sessionFactory.createSession();
    -            FlowFile flowFile = session.get();
    -            if (flowFile == null) {
    +            final List<FlowFile> flowFiles = session.get(1000);
    +            if (flowFiles.isEmpty()) {
                     break;
                 }
     
    -            flowFile = this.preprocessFlowFile(context, session, flowFile);
    -
    -            String groupId = this.getGroupId(context, flowFile);
    -
    -            final boolean binned = binManager.offer(groupId, flowFile, session);
    -
    -            // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
    -            if (!binned) {
    -                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
    -                bin.offer(flowFile, session);
    -                this.readyBins.add(bin);
    +            final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
    +            for (FlowFile flowFile : flowFiles) {
    +                flowFile = this.preprocessFlowFile(context, session, flowFile);
    +                final String groupingIdentifier = getGroupId(context, flowFile);
    +                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
    --- End diff --
    
    I don't believe so. Using putIfAbsent, we would be creating a new ArrayList every time. By using the computeIfAbsent, it allows us to create the ArrayList only if the key is not already present.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1115: NIFI-2850: Added a migrate() method to ProcessSessi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1115#discussion_r84719151
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---
    @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto
                 }
     
                 final ProcessSession session = sessionFactory.createSession();
    -            FlowFile flowFile = session.get();
    -            if (flowFile == null) {
    +            final List<FlowFile> flowFiles = session.get(1000);
    +            if (flowFiles.isEmpty()) {
                     break;
                 }
     
    -            flowFile = this.preprocessFlowFile(context, session, flowFile);
    -
    -            String groupId = this.getGroupId(context, flowFile);
    -
    -            final boolean binned = binManager.offer(groupId, flowFile, session);
    -
    -            // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
    -            if (!binned) {
    -                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
    -                bin.offer(flowFile, session);
    -                this.readyBins.add(bin);
    +            final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
    +            for (FlowFile flowFile : flowFiles) {
    +                flowFile = this.preprocessFlowFile(context, session, flowFile);
    +                final String groupingIdentifier = getGroupId(context, flowFile);
    +                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
                 }
     
    -            flowFilesBinned++;
    +            for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {
    +                final Set<FlowFile> unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
    +                for (final FlowFile flowFile : unbinned) {
    +                    Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
    +                    bin.offer(flowFile, session);
    +                    this.readyBins.add(bin);
    +                }
    +            }
    --- End diff --
    
    After looking at ```BinManager.offer(..)``` I am not sure I understand what's happening in inner loop above. Arn't you essentially doing the same thing in ```BinManager:201```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1115: NIFI-2850: Added a migrate() method to ProcessSessi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/1115


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1115: NIFI-2850: Added a migrate() method to ProcessSession and ...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/1115
  
    Reviewing. . .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1115: NIFI-2850: Added a migrate() method to ProcessSessi...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1115#discussion_r87260657
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -377,7 +377,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
             final AtomicInteger successfulRecordCount = new AtomicInteger(0);
             List<HiveStreamingRecord> successfulRecords = new LinkedList<>();
             final FlowFile inputFlowFile = flowFile;
    -        final AtomicBoolean incomingFlowFileTransferred = new AtomicBoolean(false);
    +        final AtomicBoolean processingFailure = new AtomicBoolean(false);
    --- End diff --
    
    @joewitt so in updating MockProcessSession to support the session migration, I found that the MockProcessSession was not properly keeping track of the 'recursionSet' that StandardProcessSession was. I fixed that bug, as it had to be addressed in order to properly implement the session migration in the Mock framework. Once I fixed that bug in Mock ProcessSession, it then exposed this unrelated bug in PutHiveStreaming. However, the bug in PutHiveStreaming would now cause unit test failures, I addressed that in this PR as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1115: NIFI-2850: Added a migrate() method to ProcessSessi...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1115#discussion_r87256917
  
    --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---
    @@ -377,7 +377,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
             final AtomicInteger successfulRecordCount = new AtomicInteger(0);
             List<HiveStreamingRecord> successfulRecords = new LinkedList<>();
             final FlowFile inputFlowFile = flowFile;
    -        final AtomicBoolean incomingFlowFileTransferred = new AtomicBoolean(false);
    +        final AtomicBoolean processingFailure = new AtomicBoolean(false);
    --- End diff --
    
    why is there any change to the hive streaming processor in this commit/PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1115: NIFI-2850: Added a migrate() method to ProcessSessi...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1115#discussion_r84713334
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---
    @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto
                 }
     
                 final ProcessSession session = sessionFactory.createSession();
    -            FlowFile flowFile = session.get();
    -            if (flowFile == null) {
    +            final List<FlowFile> flowFiles = session.get(1000);
    +            if (flowFiles.isEmpty()) {
                     break;
                 }
     
    -            flowFile = this.preprocessFlowFile(context, session, flowFile);
    -
    -            String groupId = this.getGroupId(context, flowFile);
    -
    -            final boolean binned = binManager.offer(groupId, flowFile, session);
    -
    -            // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
    -            if (!binned) {
    -                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
    -                bin.offer(flowFile, session);
    -                this.readyBins.add(bin);
    +            final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
    +            for (FlowFile flowFile : flowFiles) {
    +                flowFile = this.preprocessFlowFile(context, session, flowFile);
    +                final String groupingIdentifier = getGroupId(context, flowFile);
    +                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
    --- End diff --
    
    Given that you're not really computing anything in the provided function and simply returning an empty list wouldn't this be more appropriate
    ```
    flowFileGroups.putIfAbsent(groupingIdentifier, new ArrayList<>()).add(flowFile);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1115: NIFI-2850: Added a migrate() method to ProcessSession and ...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the issue:

    https://github.com/apache/nifi/pull/1115
  
    @olegz Thanks for reviewing! I have responded to your concerns above. If you feel they need more discussion, then we certainly can. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1115: NIFI-2850: Added a migrate() method to ProcessSessi...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1115#discussion_r86138383
  
    --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---
    @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto
                 }
     
                 final ProcessSession session = sessionFactory.createSession();
    -            FlowFile flowFile = session.get();
    -            if (flowFile == null) {
    +            final List<FlowFile> flowFiles = session.get(1000);
    +            if (flowFiles.isEmpty()) {
                     break;
                 }
     
    -            flowFile = this.preprocessFlowFile(context, session, flowFile);
    -
    -            String groupId = this.getGroupId(context, flowFile);
    -
    -            final boolean binned = binManager.offer(groupId, flowFile, session);
    -
    -            // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy.
    -            if (!binned) {
    -                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
    -                bin.offer(flowFile, session);
    -                this.readyBins.add(bin);
    +            final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
    +            for (FlowFile flowFile : flowFiles) {
    +                flowFile = this.preprocessFlowFile(context, session, flowFile);
    +                final String groupingIdentifier = getGroupId(context, flowFile);
    +                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
                 }
     
    -            flowFilesBinned++;
    +            for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {
    +                final Set<FlowFile> unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
    +                for (final FlowFile flowFile : unbinned) {
    +                    Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
    +                    bin.offer(flowFile, session);
    +                    this.readyBins.add(bin);
    +                }
    +            }
    --- End diff --
    
    Not exactly. The loop above says "if the bin manager didn't bin it for whatever reason, create our own one-element bin and process it (by adding to this.readyBins) - nothing else will go in this bin." In BinManager:201, it is saying "if the FlowFile didn't fit in any of the bins that are available, create a new bin and add this FlowFile to it. Subsequent FlowFiles may then go into this bin."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1115: NIFI-2850: Added a migrate() method to ProcessSession and ...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/1115
  
    @markap14 all is good. +1 here but think someone else should give it a look as well given its complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---