You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by phrocker <gi...@git.apache.org> on 2017/07/05 15:56:02 UTC

[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...

GitHub user phrocker opened a pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117

    MINIFI-338: Convert processor threads to use thread pools

    Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
         in the commit message?
    
    - [ ] Does your PR title start with MINIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [ ] If applicable, have you updated the LICENSE file?
    - [ ] If applicable, have you updated the NOTICE file?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/phrocker/nifi-minifi-cpp MINIFI-338

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi-minifi-cpp/pull/117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #117
    
----
commit 3388c068429b08bbb2995184f236a3a451a78dc7
Author: Marc Parisi <ph...@apache.org>
Date:   2017-06-30T14:05:15Z

    MINIFI-338: Convert processor threads to use thread pools

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128656760
  
    --- Diff: libminifi/include/utils/ThreadPool.h ---
    @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
     template<typename T>
     void ThreadPool<T>::run_tasks() {
       auto waitperiod = std::chrono::milliseconds(1) * 100;
    +  uint64_t wait_decay_ = 0;
       while (running_.load()) {
     
    +    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
    +    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
    +    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
    +    // be more likely to run. This is intentional.
    +    if (wait_decay_ > 1000) {
    +      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
    +    }
         Worker<T> task;
         if (!worker_queue_.try_dequeue(task)) {
    +
           std::unique_lock<std::mutex> lock(worker_queue_mutex_);
           tasks_available_.wait_for(lock, waitperiod);
           continue;
         }
    -    task.run();
    +    else {
    +
    +      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +      if (!task_status_[task.getIdentifier()]) {
    +        continue;
    +      }
    +    }
    +
    +    bool wait_to_run = false;
    +    if (task.getTimeSlice() > 1) {
    +      auto now = std::chrono::system_clock::now().time_since_epoch();
    +      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
    +      if (task.getTimeSlice() > ms.count()) {
    +        wait_to_run = true;
    +      }
    +    }
    +    // if we have to wait we re-queue the worker.
    +    if (wait_to_run) {
    +      {
    +        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +        if (!task_status_[task.getIdentifier()]) {
    +          continue;
    +        }
    +      }
    +      worker_queue_.enqueue(std::move(task));
    --- End diff --
    
    Need? Not really we could run with the same task, but the premise is to enqueue in the event that something else could be pulled off if another task exists, if this one is dequeued, then we run it unless the timeslice has again said "come back later." Admittedly it's waste of a queue, but we won't know if a task is available after the wait period. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #117: MINIFI-338: Convert processor threads to use thr...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/117
  
    overall looks good. may be some optimization for the queue.
    Please some tests in long duration to make sure it is not breaking the master because it is big change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128773288
  
    --- Diff: libminifi/include/utils/ThreadPool.h ---
    @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
     template<typename T>
     void ThreadPool<T>::run_tasks() {
       auto waitperiod = std::chrono::milliseconds(1) * 100;
    +  uint64_t wait_decay_ = 0;
       while (running_.load()) {
     
    +    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
    +    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
    +    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
    +    // be more likely to run. This is intentional.
    +    if (wait_decay_ > 1000) {
    +      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
    +    }
         Worker<T> task;
         if (!worker_queue_.try_dequeue(task)) {
    +
           std::unique_lock<std::mutex> lock(worker_queue_mutex_);
           tasks_available_.wait_for(lock, waitperiod);
           continue;
         }
    -    task.run();
    +    else {
    +
    +      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +      if (!task_status_[task.getIdentifier()]) {
    +        continue;
    +      }
    +    }
    +
    +    bool wait_to_run = false;
    +    if (task.getTimeSlice() > 1) {
    +      auto now = std::chrono::system_clock::now().time_since_epoch();
    +      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
    +      if (task.getTimeSlice() > ms.count()) {
    +        wait_to_run = true;
    +      }
    +    }
    +    // if we have to wait we re-queue the worker.
    +    if (wait_to_run) {
    +      {
    +        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +        if (!task_status_[task.getIdentifier()]) {
    +          continue;
    +        }
    +      }
    +      worker_queue_.enqueue(std::move(task));
    --- End diff --
    
    OK. it is possible to sort the queue or somehow to make it such that the head of the queue is the first to expire.
    In this case, we can avoid enqueue/dequeue for all the items in the queues.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #117: MINIFI-338: Convert processor threads to use thr...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/117
  
    @phrocker the normal flow that i run is attached. one get file connected to a RPG
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128621371
  
    --- Diff: libminifi/include/utils/ThreadPool.h ---
    @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
     template<typename T>
     void ThreadPool<T>::run_tasks() {
       auto waitperiod = std::chrono::milliseconds(1) * 100;
    +  uint64_t wait_decay_ = 0;
       while (running_.load()) {
     
    +    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
    +    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
    +    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
    +    // be more likely to run. This is intentional.
    +    if (wait_decay_ > 1000) {
    +      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
    --- End diff --
    
    we increase wait_decay if there is not task to run. so the wait_decay may become a very large number if we do not have task to run for a long time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #117: MINIFI-338: Convert processor threads to use thr...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/117
  
    @benqiu2016 I've run this for > 24 hrs without issues. Is there additional testing you would like to see? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128787106
  
    --- Diff: libminifi/include/utils/ThreadPool.h ---
    @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
     template<typename T>
     void ThreadPool<T>::run_tasks() {
       auto waitperiod = std::chrono::milliseconds(1) * 100;
    +  uint64_t wait_decay_ = 0;
       while (running_.load()) {
     
    +    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
    +    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
    +    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
    +    // be more likely to run. This is intentional.
    +    if (wait_decay_ > 1000) {
    +      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
    +    }
         Worker<T> task;
         if (!worker_queue_.try_dequeue(task)) {
    +
           std::unique_lock<std::mutex> lock(worker_queue_mutex_);
           tasks_available_.wait_for(lock, waitperiod);
           continue;
         }
    -    task.run();
    +    else {
    +
    +      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +      if (!task_status_[task.getIdentifier()]) {
    +        continue;
    +      }
    +    }
    +
    +    bool wait_to_run = false;
    +    if (task.getTimeSlice() > 1) {
    +      auto now = std::chrono::system_clock::now().time_since_epoch();
    +      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
    +      if (task.getTimeSlice() > ms.count()) {
    +        wait_to_run = true;
    +      }
    +    }
    +    // if we have to wait we re-queue the worker.
    +    if (wait_to_run) {
    +      {
    +        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +        if (!task_status_[task.getIdentifier()]) {
    +          continue;
    +        }
    +      }
    +      worker_queue_.enqueue(std::move(task));
    --- End diff --
    
    Unfortunately that would require a locking queue or dequeuing everything in order to sort. Since it is a lock free queue tasks can be enqueued and dequeued with only a std::move with relatively low cost. An alternative to this would be to make multiple queues, but then that would require a prioritized strategy to access each queue. I think that would be a follow on activity if the need arose. 
    
    The biggest negative thus far is that shutdown takes longer because we are now deterministically awaiting threads to end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #117: MINIFI-338: Convert processor threads to use thr...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/117
  
    @benqiu2016 I've only run this for two hours. It dramatically improved throughput on that test when configuring several concurrent tasks for my threads. Do you have an example flow that you like to run with? I would be happy to run that overnight. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi-minifi-cpp/pull/117


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128620813
  
    --- Diff: libminifi/include/utils/ThreadPool.h ---
    @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
     template<typename T>
     void ThreadPool<T>::run_tasks() {
       auto waitperiod = std::chrono::milliseconds(1) * 100;
    +  uint64_t wait_decay_ = 0;
       while (running_.load()) {
     
    +    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
    +    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
    +    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
    +    // be more likely to run. This is intentional.
    +    if (wait_decay_ > 1000) {
    +      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
    +    }
         Worker<T> task;
         if (!worker_queue_.try_dequeue(task)) {
    +
           std::unique_lock<std::mutex> lock(worker_queue_mutex_);
           tasks_available_.wait_for(lock, waitperiod);
           continue;
         }
    -    task.run();
    +    else {
    +
    +      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +      if (!task_status_[task.getIdentifier()]) {
    +        continue;
    +      }
    +    }
    +
    +    bool wait_to_run = false;
    +    if (task.getTimeSlice() > 1) {
    +      auto now = std::chrono::system_clock::now().time_since_epoch();
    +      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
    +      if (task.getTimeSlice() > ms.count()) {
    +        wait_to_run = true;
    +      }
    +    }
    +    // if we have to wait we re-queue the worker.
    +    if (wait_to_run) {
    +      {
    +        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +        if (!task_status_[task.getIdentifier()]) {
    +          continue;
    +        }
    +      }
    +      worker_queue_.enqueue(std::move(task));
    --- End diff --
    
    do we need to enqueue to head?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp issue #117: MINIFI-338: Convert processor threads to use thr...

Posted by benqiu2016 <gi...@git.apache.org>.
Github user benqiu2016 commented on the issue:

    https://github.com/apache/nifi-minifi-cpp/pull/117
  
    
    [config.txt](https://github.com/apache/nifi-minifi-cpp/files/1166193/config.txt)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi-minifi-cpp pull request #117: MINIFI-338: Convert processor threads to ...

Posted by phrocker <gi...@git.apache.org>.
Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128638003
  
    --- Diff: libminifi/include/utils/ThreadPool.h ---
    @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
     template<typename T>
     void ThreadPool<T>::run_tasks() {
       auto waitperiod = std::chrono::milliseconds(1) * 100;
    +  uint64_t wait_decay_ = 0;
       while (running_.load()) {
     
    +    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
    +    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
    +    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
    +    // be more likely to run. This is intentional.
    +    if (wait_decay_ > 1000) {
    +      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
    --- End diff --
    
    @benqiu2016 thanks. I thought I took care of that.Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---