You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/01/24 21:08:40 UTC
[nifi] branch main updated: NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b5dd354 NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order
b5dd354 is described below
commit b5dd35431e2251802bd0c13c5de016477ede49c5
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Jan 24 11:42:03 2022 -0500
NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order
This closes #5706.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../scheduling/StatelessProcessScheduler.java | 4 +-
.../apache/nifi/stateless/StatelessSystemIT.java | 2 +-
.../stateless/basics/ProcessorLifecycleIT.java | 78 +++++++++++++++
.../tests/system/WriteLifecycleEvents.java | 106 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 3 +-
5 files changed, 190 insertions(+), 3 deletions(-)
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index 07b2081..c129249 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -27,6 +27,7 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -149,7 +150,8 @@ public class StatelessProcessScheduler implements ProcessScheduler {
logger.info("Stopping {}", procNode);
final ProcessContext processContext = processContextFactory.createProcessContext(procNode);
final LifecycleState lifecycleState = new LifecycleState();
- lifecycleState.setScheduled(false);
+ final boolean scheduled = procNode.getScheduledState() == ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0;
+ lifecycleState.setScheduled(scheduled);
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, schedulingAgent, lifecycleState);
}
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 828f56e..697dd05 100644
--- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -62,7 +62,7 @@ public class StatelessSystemIT {
public TestName name = new TestName();
@Rule
- public Timeout defaultTimeout = new Timeout(30, TimeUnit.MINUTES);
+ public Timeout defaultTimeout = new Timeout(5, TimeUnit.MINUTES);
@Before
diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
new file mode 100644
index 0000000..b2596f7
--- /dev/null
+++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.basics;
+
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.stateless.StatelessSystemIT;
+import org.apache.nifi.stateless.VersionedFlowBuilder;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ProcessorLifecycleIT extends StatelessSystemIT {
+ private static final Logger logger = LoggerFactory.getLogger(ProcessorLifecycleIT.class);
+
+ @Test
+ public void testRunProcessorShutdown() throws StatelessConfigurationException, IOException, InterruptedException {
+ final File eventsFile = new File("target/events.txt");
+ Files.deleteIfExists(eventsFile.toPath());
+
+ final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder();
+
+ final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile");
+ final VersionedProcessor writeLifecycleEvents = flowBuilder.createSimpleProcessor("WriteLifecycleEvents");
+
+ flowBuilder.createConnection(generate, writeLifecycleEvents, "success");
+
+ writeLifecycleEvents.setAutoTerminatedRelationships(Collections.singleton("success"));
+ writeLifecycleEvents.setProperties(Collections.singletonMap("Event File", eventsFile.getAbsolutePath()));
+
+ final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList());
+
+ final DataflowTrigger trigger = dataflow.trigger();
+ final TriggerResult result = trigger.getResult();
+ result.acknowledge();
+
+ dataflow.shutdown();
+
+ List<String> events = Files.readAllLines(eventsFile.toPath());
+
+ // Because the processors may be stopped in the background, we want to wait until we receive the events that we expect.
+ while (events.size() < 4) {
+ logger.info("Expecting to find 4 events written to {} but currently found only {}; will wait 100 milliseconds and check again", eventsFile.getAbsolutePath(), events.size());
+
+ Thread.sleep(100L);
+ events = Files.readAllLines(eventsFile.toPath());
+ }
+
+ assertEquals(Arrays.asList("OnScheduled", "OnTrigger", "OnUnscheduled", "OnStopped"), events);
+ }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java
new file mode 100644
index 0000000..064f829
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class WriteLifecycleEvents extends AbstractProcessor {
+ static final PropertyDescriptor EVENT_FILE = new PropertyDescriptor.Builder()
+ .name("Event File")
+ .displayName("Event File")
+ .description("Specifies the file to write to that contains a line of text for each lifecycle event that occurs")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("target/CountLifecycleEvents.events")
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles go here")
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Collections.singletonList(EVENT_FILE);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ writeEvent(context, "OnScheduled");
+ }
+
+ @OnUnscheduled
+ public void onUnscheduled(final ProcessContext context) throws IOException {
+ writeEvent(context, "OnUnscheduled");
+ }
+
+ @OnStopped
+ public void onStopped(final ProcessContext context) throws IOException {
+ writeEvent(context, "OnStopped");
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ try {
+ writeEvent(context, "OnTrigger");
+ } catch (IOException e) {
+ throw new ProcessException(e);
+ }
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
+ private void writeEvent(final ProcessContext context, final String event) throws IOException {
+ final File file = new File(context.getProperty(EVENT_FILE).getValue());
+
+ final byte[] eventBytes = (event + "\n").getBytes(StandardCharsets.UTF_8);
+
+ try (final OutputStream fos = new FileOutputStream(file, true)) {
+ fos.write(eventBytes);
+ }
+ }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 77143b1..b661c11 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -43,4 +43,5 @@ org.apache.nifi.processors.tests.system.UpdateContent
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.VerifyContents
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
-org.apache.nifi.processors.tests.system.WriteToFile
\ No newline at end of file
+org.apache.nifi.processors.tests.system.WriteLifecycleEvents
+org.apache.nifi.processors.tests.system.WriteToFile