You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/16 16:47:30 UTC
[37/50] [abbrv] incubator-nifi git commit: NIFI-720: Ensure that if
Reporting Task stopped while @OnScheduled method is failing that it does not
start running when configuration is fixed
NIFI-720: Ensure that if Reporting Task stopped while @OnScheduled method is failing that it does not start running when configuration is fixed
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8da73271
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8da73271
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8da73271
Branch: refs/heads/master
Commit: 8da7327188ebdb3cbadda257429e4967be07bf77
Parents: 2a4e5e1
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jun 23 16:23:01 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 3 13:14:16 2015 -0400
----------------------------------------------------------------------
.../scheduling/StandardProcessScheduler.java | 74 ++++++++-----
.../TestStandardProcessScheduler.java | 107 +++++++++++++++++++
.../src/test/resources/nifi.properties | 2 +-
3 files changed, 154 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8da73271/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index cf644ed..d976bd0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -174,31 +174,45 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@SuppressWarnings("deprecation")
@Override
public void run() {
+ final long lastStopTime = scheduleState.getLastStopTime();
+ final ReportingTask reportingTask = taskNode.getReportingTask();
+
// Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
while (true) {
- final ReportingTask reportingTask = taskNode.getReportingTask();
-
try {
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
- }
+ synchronized (scheduleState) {
+ // if no longer scheduled to run, then we're finished. This can happen, for example,
+ // if the @OnScheduled method throws an Exception and the user stops the reporting task
+ // while we're administratively yielded.
+ // we also check if the schedule state's last start time is equal to what it was before.
+ // if not, then means that the reporting task has been stopped and started again, so we should just
+ // bail; another thread will be responsible for invoking the @OnScheduled methods.
+ if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
+ return;
+ }
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
+ }
- break;
+ agent.schedule(taskNode, scheduleState);
+ return;
+ }
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
- LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
- new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+ LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this "
+ + "ReportingTask and will attempt to schedule it again after {}",
+ new Object[] { reportingTask, e.toString(), administrativeYieldDuration }, e);
+
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
}
}
-
- agent.schedule(taskNode, scheduleState);
}
};
@@ -216,7 +230,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
taskNode.verifyCanStop();
final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
final ReportingTask reportingTask = taskNode.getReportingTask();
- scheduleState.setScheduled(false);
taskNode.setScheduledState(ScheduledState.STOPPED);
final Runnable unscheduleReportingTaskRunnable = new Runnable() {
@@ -225,29 +238,33 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public void run() {
final ConfigurationContext configurationContext = taskNode.getConfigurationContext();
- try {
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
- }
- } catch (final Exception e) {
- final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
- final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
- componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
+ synchronized (scheduleState) {
+ scheduleState.setScheduled(false);
+
+ try {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
+ }
+ } catch (final Exception e) {
+ final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+ final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
+ componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
- LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+ LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
reportingTask, cause.toString(), administrativeYieldDuration);
- LOG.error("", cause);
+ LOG.error("", cause);
- try {
- Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {
+ try {
+ Thread.sleep(administrativeYieldMillis);
+ } catch (final InterruptedException ie) {
+ }
}
- }
- agent.unschedule(taskNode, scheduleState);
+ agent.unschedule(taskNode, scheduleState);
- if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
+ if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
+ }
}
}
};
@@ -694,6 +711,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
}
}
+
}
};
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8da73271/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
new file mode 100644
index 0000000..72eaa84
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.scheduling;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.Heartbeater;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestStandardProcessScheduler {
+ private StandardProcessScheduler scheduler = null;
+ private ReportingTaskNode taskNode = null;
+ private TestReportingTask reportingTask = null;
+
+ @Before
+ public void setup() throws InitializationException {
+ System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
+ scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null);
+ scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class));
+
+ reportingTask = new TestReportingTask();
+ final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs",
+ Mockito.mock(ComponentLog.class), null);
+ reportingTask.initialize(config);
+
+ final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null);
+ taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory);
+ }
+
+ /**
+ * We have run into an issue where a Reporting Task is scheduled to run but throws an Exception
+ * from a method with the @OnScheduled annotation. User stops Reporting Task, updates configuration
+ * to fix the issue. Reporting Task then finishes running @OnSchedule method and is then scheduled to run.
+ * This unit test is intended to verify that we have this resolved.
+ */
+ @Test
+ public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedException, InitializationException {
+ scheduler.schedule(taskNode);
+
+ // Let it try to run a few times.
+ Thread.sleep(1000L);
+
+ scheduler.unschedule(taskNode);
+
+ final int attempts = reportingTask.onScheduleAttempts.get();
+ // give it a sec to make sure that it's finished running.
+ Thread.sleep(1500L);
+ final int attemptsAfterStop = reportingTask.onScheduleAttempts.get() - attempts;
+
+ // allow 1 extra run, due to timing issues that could call it as it's being stopped.
+ assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1);
+ }
+
+
+ private class TestReportingTask extends AbstractReportingTask {
+ private final AtomicBoolean failOnScheduled = new AtomicBoolean(true);
+ private final AtomicInteger onScheduleAttempts = new AtomicInteger(0);
+ private final AtomicInteger triggerCount = new AtomicInteger(0);
+
+ @OnScheduled
+ public void onScheduled() {
+ onScheduleAttempts.incrementAndGet();
+
+ if (failOnScheduled.get()) {
+ throw new RuntimeException("Intentional Exception for testing purposes");
+ }
+ }
+
+ @Override
+ public void onTrigger(final ReportingContext context) {
+ triggerCount.getAndIncrement();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8da73271/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
index e7ea9b6..5d3344d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
@@ -20,7 +20,7 @@ nifi.flow.configuration.archive.dir=./target/archive/
nifi.flowcontroller.autoResumeState=true
nifi.flowcontroller.graceful.shutdown.period=10 sec
nifi.flowservice.writedelay.interval=2 sec
-nifi.administrative.yield.duration=30 sec
+nifi.administrative.yield.duration=500 millis
nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
nifi.controller.service.configuration.file=./target/controller-services.xml