You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/01/24 21:08:40 UTC

[nifi] branch main updated: NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b5dd354  NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order
b5dd354 is described below

commit b5dd35431e2251802bd0c13c5de016477ede49c5
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Jan 24 11:42:03 2022 -0500

    NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order
    
    This closes #5706.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../scheduling/StatelessProcessScheduler.java      |   4 +-
 .../apache/nifi/stateless/StatelessSystemIT.java   |   2 +-
 .../stateless/basics/ProcessorLifecycleIT.java     |  78 +++++++++++++++
 .../tests/system/WriteLifecycleEvents.java         | 106 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   3 +-
 5 files changed, 190 insertions(+), 3 deletions(-)

diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index 07b2081..c129249 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -27,6 +27,7 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.SchedulingAgentCallback;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -149,7 +150,8 @@ public class StatelessProcessScheduler implements ProcessScheduler {
         logger.info("Stopping {}", procNode);
         final ProcessContext processContext = processContextFactory.createProcessContext(procNode);
         final LifecycleState lifecycleState = new LifecycleState();
-        lifecycleState.setScheduled(false);
+        final boolean scheduled = procNode.getScheduledState() == ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0;
+        lifecycleState.setScheduled(scheduled);
         return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, schedulingAgent, lifecycleState);
     }
 
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 828f56e..697dd05 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -62,7 +62,7 @@ public class StatelessSystemIT {
     public TestName name = new TestName();
 
     @Rule
-    public Timeout defaultTimeout = new Timeout(30, TimeUnit.MINUTES);
+    public Timeout defaultTimeout = new Timeout(5, TimeUnit.MINUTES);
 
 
     @Before
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
new file mode 100644
index 0000000..b2596f7
--- /dev/null
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.basics;
+
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.stateless.StatelessSystemIT;
+import org.apache.nifi.stateless.VersionedFlowBuilder;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ProcessorLifecycleIT extends StatelessSystemIT {
+    private static final Logger logger = LoggerFactory.getLogger(ProcessorLifecycleIT.class);
+
+    @Test
+    public void testRunProcessorShutdown() throws StatelessConfigurationException, IOException, InterruptedException {
+        final File eventsFile = new File("target/events.txt");
+        Files.deleteIfExists(eventsFile.toPath());
+
+        final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+        final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile");
+        final VersionedProcessor writeLifecycleEvents = flowBuilder.createSimpleProcessor("WriteLifecycleEvents");
+
+        flowBuilder.createConnection(generate, writeLifecycleEvents, "success");
+
+        writeLifecycleEvents.setAutoTerminatedRelationships(Collections.singleton("success"));
+        writeLifecycleEvents.setProperties(Collections.singletonMap("Event File", eventsFile.getAbsolutePath()));
+
+        final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+
+        final DataflowTrigger trigger = dataflow.trigger();
+        final TriggerResult result = trigger.getResult();
+        result.acknowledge();
+
+        dataflow.shutdown();
+
+        List<String> events = Files.readAllLines(eventsFile.toPath());
+
+        // Because the processors may be stopped in the background, we want to wait until we receive the events that we expect.
+        while (events.size() < 4) {
+            logger.info("Expecting to find 4 events written to {} but currently found only {}; will wait 100 milliseconds and check again", eventsFile.getAbsolutePath(), events.size());
+
+            Thread.sleep(100L);
+            events = Files.readAllLines(eventsFile.toPath());
+        }
+
+        assertEquals(Arrays.asList("OnScheduled", "OnTrigger", "OnUnscheduled", "OnStopped"), events);
+    }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java
new file mode 100644
index 0000000..064f829
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class WriteLifecycleEvents extends AbstractProcessor {
+    static final PropertyDescriptor EVENT_FILE = new PropertyDescriptor.Builder()
+        .name("Event File")
+        .displayName("Event File")
+        .description("Specifies the file to write to that contains a line of text for each lifecycle event that occurs")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .defaultValue("target/CountLifecycleEvents.events")
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles go here")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.singletonList(EVENT_FILE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        writeEvent(context, "OnScheduled");
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled(final ProcessContext context) throws IOException {
+        writeEvent(context, "OnUnscheduled");
+    }
+
+    @OnStopped
+    public void onStopped(final ProcessContext context) throws IOException {
+        writeEvent(context, "OnStopped");
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        try {
+            writeEvent(context, "OnTrigger");
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    private void writeEvent(final ProcessContext context, final String event) throws IOException {
+        final File file = new File(context.getProperty(EVENT_FILE).getValue());
+
+        final byte[] eventBytes = (event + "\n").getBytes(StandardCharsets.UTF_8);
+
+        try (final OutputStream fos = new FileOutputStream(file, true)) {
+            fos.write(eventBytes);
+        }
+    }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 77143b1..b661c11 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -43,4 +43,5 @@ org.apache.nifi.processors.tests.system.UpdateContent
 org.apache.nifi.processors.tests.system.ValidateFileExists
 org.apache.nifi.processors.tests.system.VerifyContents
 org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
-org.apache.nifi.processors.tests.system.WriteToFile
\ No newline at end of file
+org.apache.nifi.processors.tests.system.WriteLifecycleEvents
+org.apache.nifi.processors.tests.system.WriteToFile