You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by JPercivall <gi...@git.apache.org> on 2016/07/11 15:27:14 UTC

[GitHub] nifi pull request #575: NIFI-619: Make MonitorActivity more cluster friendly

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/575#discussion_r70279855
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java ---
    @@ -201,17 +253,57 @@ public void process(final OutputStream out) throws IOException {
                 }
             } else {
                 session.transfer(flowFiles, REL_SUCCESS);
    +            updatedLatestSuccessTransfer = now;
                 logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()});
     
    -            final long inactivityStartMillis = latestSuccessTransfer.getAndSet(now);
    +            final long latestStateReportTimestamp = latestReportedNodeState.get();
    +            if (SCOPE_CLUSTER.equals(monitoringScope)
    +                    && (now - latestStateReportTimestamp) > (thresholdMillis / 3)) {
    +                // We don't want to hit the state manager every onTrigger(), but often enough to detect activeness.
    +                try {
    +                    final StateManager stateManager = context.getStateManager();
    +                    final StateMap state = stateManager.getState(Scope.CLUSTER);
    +
    +                    final Map<String, String> newValues = new HashMap<>();
    +
    +                    // Persist attributes so that other nodes can copy it
    +                    if (copyAttributes) {
    +                        newValues.putAll(flowFiles.get(0).getAttributes());
    +                    }
    +                    newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(now));
    +
    +                    if (state == null || state.getVersion() == -1) {
    +                        stateManager.setState(newValues, Scope.CLUSTER);
    +                    } else {
    +                        // If this returns false due to race condition, it's not a problem since we just need
    +                        // the latest active timestamp.
    +                        stateManager.replace(state, newValues, Scope.CLUSTER);
    --- End diff --
    
    While it usually is the case, we can't assume that the new state has a later timestamp. It could be case that one node is much more taxed than another and it takes longer to transmit state back to the ZooKeeper cluster (where clustered state is stored). This should continually check/attempt to replace until it succeeds or the stored state is later than the current timestamp. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---