You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/16 16:47:30 UTC

[37/50] [abbrv] incubator-nifi git commit: NIFI-720: Ensure that if Reporting Task stopped while @OnScheduled method is failing that it does not start running when configuration is fixed

NIFI-720: Ensure that if Reporting Task stopped while @OnScheduled method is failing that it does not start running when configuration is fixed


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8da73271
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8da73271
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8da73271

Branch: refs/heads/master
Commit: 8da7327188ebdb3cbadda257429e4967be07bf77
Parents: 2a4e5e1
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jun 23 16:23:01 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 3 13:14:16 2015 -0400

----------------------------------------------------------------------
 .../scheduling/StandardProcessScheduler.java    |  74 ++++++++-----
 .../TestStandardProcessScheduler.java           | 107 +++++++++++++++++++
 .../src/test/resources/nifi.properties          |   2 +-
 3 files changed, 154 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8da73271/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index cf644ed..d976bd0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -174,31 +174,45 @@ public final class StandardProcessScheduler implements ProcessScheduler {
             @SuppressWarnings("deprecation")
             @Override
             public void run() {
+                final long lastStopTime = scheduleState.getLastStopTime();
+                final ReportingTask reportingTask = taskNode.getReportingTask();
+
                 // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
                 while (true) {
-                    final ReportingTask reportingTask = taskNode.getReportingTask();
-
                     try {
-                        try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                            ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
-                        }
+                        synchronized (scheduleState) {
+                            // if no longer scheduled to run, then we're finished. This can happen, for example,
+                            // if the @OnScheduled method throws an Exception and the user stops the reporting task
+                            // while we're administratively yielded.
+                            // we also check if the schedule state's last start time is equal to what it was before.
+                            // if not, then means that the reporting task has been stopped and started again, so we should just
+                            // bail; another thread will be responsible for invoking the @OnScheduled methods.
+                            if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
+                                return;
+                            }
+
+                            try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                                ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
+                            }
 
-                        break;
+                            agent.schedule(taskNode, scheduleState);
+                            return;
+                        }
                     } catch (final Exception e) {
                         final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
                         final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
                         componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
 
-                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
-                                new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this "
+                            + "ReportingTask and will attempt to schedule it again after {}",
+                            new Object[] { reportingTask, e.toString(), administrativeYieldDuration }, e);
+
                         try {
                             Thread.sleep(administrativeYieldMillis);
                         } catch (final InterruptedException ie) {
                         }
                     }
                 }
-
-                agent.schedule(taskNode, scheduleState);
             }
         };
 
@@ -216,7 +230,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         taskNode.verifyCanStop();
         final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
         final ReportingTask reportingTask = taskNode.getReportingTask();
-        scheduleState.setScheduled(false);
         taskNode.setScheduledState(ScheduledState.STOPPED);
 
         final Runnable unscheduleReportingTaskRunnable = new Runnable() {
@@ -225,29 +238,33 @@ public final class StandardProcessScheduler implements ProcessScheduler {
             public void run() {
                 final ConfigurationContext configurationContext = taskNode.getConfigurationContext();
 
-                try {
-                    try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                        ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
-                    }
-                } catch (final Exception e) {
-                    final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
-                    final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
-                    componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
+                synchronized (scheduleState) {
+                    scheduleState.setScheduled(false);
+
+                    try {
+                        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                            ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
+                        }
+                    } catch (final Exception e) {
+                        final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+                        final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
+                        componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
 
-                    LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+                        LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
                             reportingTask, cause.toString(), administrativeYieldDuration);
-                    LOG.error("", cause);
+                        LOG.error("", cause);
 
-                    try {
-                        Thread.sleep(administrativeYieldMillis);
-                    } catch (final InterruptedException ie) {
+                        try {
+                            Thread.sleep(administrativeYieldMillis);
+                        } catch (final InterruptedException ie) {
+                        }
                     }
-                }
 
-                agent.unschedule(taskNode, scheduleState);
+                    agent.unschedule(taskNode, scheduleState);
 
-                if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
+                    if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
+                    }
                 }
             }
         };
@@ -694,6 +711,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                         }
                     }
                 }
+
             }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8da73271/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
new file mode 100644
index 0000000..72eaa84
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.scheduling;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.Heartbeater;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestStandardProcessScheduler {
+    private StandardProcessScheduler scheduler = null;
+    private ReportingTaskNode taskNode = null;
+    private TestReportingTask reportingTask = null;
+
+    @Before
+    public void setup() throws InitializationException {
+        System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
+        scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null);
+        scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class));
+
+        reportingTask = new TestReportingTask();
+        final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs",
+            Mockito.mock(ComponentLog.class), null);
+        reportingTask.initialize(config);
+
+        final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null);
+        taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory);
+    }
+
+    /**
+     * We have run into an issue where a Reporting Task is scheduled to run but throws an Exception
+     * from a method with the @OnScheduled annotation. User stops Reporting Task, updates configuration
+     * to fix the issue. Reporting Task then finishes running @OnSchedule method and is then scheduled to run.
+     * This unit test is intended to verify that we have this resolved.
+     */
+    @Test
+    public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedException, InitializationException {
+        scheduler.schedule(taskNode);
+
+        // Let it try to run a few times.
+        Thread.sleep(1000L);
+
+        scheduler.unschedule(taskNode);
+
+        final int attempts = reportingTask.onScheduleAttempts.get();
+        // give it a sec to make sure that it's finished running.
+        Thread.sleep(1500L);
+        final int attemptsAfterStop = reportingTask.onScheduleAttempts.get() - attempts;
+
+        // allow 1 extra run, due to timing issues that could call it as it's being stopped.
+        assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1);
+    }
+
+
+    private class TestReportingTask extends AbstractReportingTask {
+        private final AtomicBoolean failOnScheduled = new AtomicBoolean(true);
+        private final AtomicInteger onScheduleAttempts = new AtomicInteger(0);
+        private final AtomicInteger triggerCount = new AtomicInteger(0);
+
+        @OnScheduled
+        public void onScheduled() {
+            onScheduleAttempts.incrementAndGet();
+
+            if (failOnScheduled.get()) {
+                throw new RuntimeException("Intentional Exception for testing purposes");
+            }
+        }
+
+        @Override
+        public void onTrigger(final ReportingContext context) {
+            triggerCount.getAndIncrement();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8da73271/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
index e7ea9b6..5d3344d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
@@ -20,7 +20,7 @@ nifi.flow.configuration.archive.dir=./target/archive/
 nifi.flowcontroller.autoResumeState=true
 nifi.flowcontroller.graceful.shutdown.period=10 sec
 nifi.flowservice.writedelay.interval=2 sec
-nifi.administrative.yield.duration=30 sec
+nifi.administrative.yield.duration=500 millis
 
 nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
 nifi.controller.service.configuration.file=./target/controller-services.xml