You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/01/06 19:58:58 UTC

[jira] [Commented] (NIFI-1682) Processor to do Rolling Window calculations using FlowFile attributes

    [ https://issues.apache.org/jira/browse/NIFI-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15805573#comment-15805573 ] 

ASF GitHub Bot commented on NIFI-1682:
--------------------------------------

Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1328#discussion_r95009399
  
    --- Diff: nifi-nar-bundles/nifi-stateful-analysis-bundle/nifi-stateful-analysis-processors/src/test/java/org/apache/nifi/processors/stateful/analysis/TestAttributeRollingWindow.java ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.stateful.analysis;
    +
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class TestAttributeRollingWindow {
    +
    +
    +    @Test
    +    public void testBasic() throws InterruptedException {
    +        final TestRunner runner = TestRunners.newTestRunner(AttributeRollingWindow.class);
    +
    +        runner.setProperty(AttributeRollingWindow.VALUE_TO_TRACK, "${value}");
    +        runner.setProperty(AttributeRollingWindow.TIME_WINDOW, "3 sec");
    +
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put("value", "1");
    +
    +
    +        runner.enqueue("1".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +        MockFlowFile flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "1.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "1");
    +
    +        runner.enqueue("2".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +         flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "2.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "2");
    +
    +        Thread.sleep(5000L);
    +
    +        runner.enqueue("2".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +        flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "1.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "1");
    +
    +    }
    +
    +
    +    @Test
    +    public void testVerifyCount() throws InterruptedException {
    +        final TestRunner runner = TestRunners.newTestRunner(AttributeRollingWindow.class);
    +
    +        runner.setProperty(AttributeRollingWindow.VALUE_TO_TRACK, "${value}");
    +        runner.setProperty(AttributeRollingWindow.TIME_WINDOW, "10 sec");
    +
    +        MockFlowFile flowFile;
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put("value", "1");
    +        for(int i = 1; i<61; i++){
    +            runner.enqueue(String.valueOf(i).getBytes(), attributes);
    +
    +            runner.run();
    +
    +            flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +            runner.clearTransferState();
    +            flowFile.assertAttributeEquals("rolling_window_value", String.valueOf(Double.valueOf(i)));
    +            flowFile.assertAttributeEquals("rolling_window_count", String.valueOf(i));
    +            Thread.sleep(10L);
    +        }
    +
    +
    +
    +        runner.setProperty(AttributeRollingWindow.VALUE_TO_TRACK, "${value}");
    +        runner.setProperty(AttributeRollingWindow.SUB_WINDOW_LENGTH, "5 sec");
    +        runner.setProperty(AttributeRollingWindow.TIME_WINDOW, "10 sec");
    +
    +        for(int i = 1; i<10; i++){
    +            runner.enqueue(String.valueOf(i).getBytes(), attributes);
    +
    +            runner.run();
    +
    +            flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +            runner.clearTransferState();
    +            flowFile.assertAttributeEquals("rolling_window_value", String.valueOf(Double.valueOf(i)));
    +            flowFile.assertAttributeEquals("rolling_window_count", String.valueOf(i));
    +
    +            Thread.sleep(1000L);
    +        }
    +
    +    }
    +
    +
    +    @Test
    +    public void testMicroBatching() throws InterruptedException {
    +        final TestRunner runner = TestRunners.newTestRunner(AttributeRollingWindow.class);
    +
    +        runner.setProperty(AttributeRollingWindow.VALUE_TO_TRACK, "${value}");
    +        runner.setProperty(AttributeRollingWindow.SUB_WINDOW_LENGTH, "5 sec");
    +        runner.setProperty(AttributeRollingWindow.TIME_WINDOW, "10 sec");
    +
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put("value", "2");
    +
    +        runner.enqueue("1".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +        MockFlowFile flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "2.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "1");
    +
    +        Thread.sleep(2000L);
    +
    +        runner.enqueue("2".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +        flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "4.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "2");
    +
    +
    +        Thread.sleep(3000L);
    +
    +        runner.enqueue("2".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +        flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "6.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "3");
    +
    +        Thread.sleep(2000L);
    +        runner.enqueue("2".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +        flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "8.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "4");
    +
    +        Thread.sleep(3000L);
    +
    +        runner.enqueue("2".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +        flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "6.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "3");
    +
    +        runner.enqueue("2".getBytes(), attributes);
    +        runner.run(1);
    +        runner.assertAllFlowFilesTransferred(AttributeRollingWindow.REL_SUCCESS, 1);
    +        flowFile = runner.getFlowFilesForRelationship(AttributeRollingWindow.REL_SUCCESS).get(0);
    +        runner.clearTransferState();
    +        flowFile.assertAttributeEquals("rolling_window_value", "8.0");
    +        flowFile.assertAttributeEquals("rolling_window_count", "4");
    +
    +    }
    +
    --- End diff --
    
    Add a small unit test regarding a bad format input value?


> Processor to do Rolling Window calculations using FlowFile attributes
> ---------------------------------------------------------------------
>
>                 Key: NIFI-1682
>                 URL: https://issues.apache.org/jira/browse/NIFI-1682
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>
> Using state it is now possible to store a map of key value pairs up to 1mb. Taking into account storing a timestamp string and a double converted to a string this is on the order of 5000 values. This enables a processor that can store a rolling window of values to calculate things such as a rolling mean.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)