You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/11/14 17:34:34 UTC

nifi git commit: NIFI-1526: DefaultSchedule annotation should be use on Custom Processor to set the default scheduling strategy, scheduling period or max number of concurrent task for each instance of the processor DefaultSettings annotation should be us

Repository: nifi
Updated Branches:
  refs/heads/master 8d3177c38 -> 65b7b377e


NIFI-1526: DefaultSchedule annotation should be use on Custom Processor to set the default scheduling strategy, scheduling period or max number of concurrent task for each instance of the processor
DefaultSettings annotation should be use on Custom Processor to set the default penalty period, the yield duration or the bulletin log level for each instance of the processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/65b7b377
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/65b7b377
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/65b7b377

Branch: refs/heads/master
Commit: 65b7b377e3072b83f75e63b053a4f080726d824d
Parents: 8d3177c
Author: Mathias Tiberghien <ma...@code192.com>
Authored: Wed Oct 5 17:50:08 2016 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Nov 14 12:04:21 2016 -0500

----------------------------------------------------------------------
 .../configuration/DefaultSchedule.java          | 45 ++++++++++++++++++++
 .../configuration/DefaultSettings.java          | 42 ++++++++++++++++++
 .../apache/nifi/controller/FlowController.java  | 27 ++++++++++++
 .../nifi/controller/StandardProcessorNode.java  | 27 +++++++++++-
 .../controller/DummyScheduledProcessor.java     | 37 ++++++++++++++++
 .../nifi/controller/DummySettingsProcessor.java | 36 ++++++++++++++++
 .../nifi/controller/TestFlowController.java     | 30 +++++++++++++
 7 files changed, 243 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/65b7b377/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java
new file mode 100644
index 0000000..ff9125c
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.annotation.configuration;
+
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Target;
+import java.lang.annotation.Retention;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Inherited;
+
+/**
+ * <p>
+ * Marker interface that a Processor can use to configure the schedule strategy, the  period and the number of concurrent tasks.
+ * Note that the number of Concurrent tasks will be ignored if the annotion @TriggerSerialy is used
+ * </p>
+ */
+@Documented
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface DefaultSchedule {
+
+    SchedulingStrategy strategy() default  SchedulingStrategy.TIMER_DRIVEN;
+    String period() default "0 sec";
+    int concurrentTasks() default 1;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/65b7b377/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java
new file mode 100644
index 0000000..d01972c
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.annotation.configuration;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Target;
+import java.lang.annotation.Retention;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Inherited;
+import org.apache.nifi.logging.LogLevel;
+
+/**
+ * <p>
+ * Marker interface that a Processor can use to configure the yield duration, the  penalty duration and the bulletin log level.
+ * Note that the number of Concurrent tasks will be ignored if the annotation @TriggerSerialy is used
+ * </p>
+ */
+@Documented
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface DefaultSettings {
+    String yieldDuration() default "1 sec";
+    String penaltyDuration() default "30 sec";
+    LogLevel bulletinLevel() default LogLevel.WARN;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/65b7b377/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index b790526..a5789e9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -21,6 +21,7 @@ import org.apache.commons.collections4.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
@@ -1060,6 +1061,32 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
         logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
 
+        try {
+            final Class<?> procClass = processor.getClass();
+            if(procClass.isAnnotationPresent(DefaultSettings.class)) {
+                DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class);
+                try {
+                    procNode.setYieldPeriod(ds.yieldDuration());
+                } catch(Throwable ex) {
+                    LOG.error(String.format("Error while setting yield period from DefaultSettings annotation:%s",ex.getMessage()),ex);
+                }
+                try {
+
+                    procNode.setPenalizationPeriod(ds.penaltyDuration());
+                } catch(Throwable ex) {
+                    LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex);
+                }
+                try {
+                    procNode.setBulletinLevel(ds.bulletinLevel());
+                } catch (Throwable ex) {
+                    LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s",ex.getMessage()),ex);
+                }
+
+            }
+        } catch (Throwable ex) {
+            LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex);
+        }
+
         if (firstTimeAdded) {
             try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
                 ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);

http://git-wip-us.apache.org/repos/asf/nifi/blob/65b7b377/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 27fdc0f..aede825 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -152,7 +153,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
                                  final VariableRegistry variableRegistry, final ComponentLog logger) {
 
         super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger);
-
         this.processor = processor;
         identifier = new AtomicReference<>(uuid);
         destinations = new HashMap<>();
@@ -191,6 +191,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
 
         schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
         executionNode = ExecutionNode.ALL;
+        try {
+            if (procClass.isAnnotationPresent(DefaultSchedule.class)) {
+                DefaultSchedule dsc = procClass.getAnnotation(DefaultSchedule.class);
+                try {
+                    this.setSchedulingStrategy(dsc.strategy());
+                } catch (Throwable ex) {
+                    LOG.error(String.format("Error while setting scheduling strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex);
+                }
+                try {
+                    this.setScheduldingPeriod(dsc.period());
+                } catch (Throwable ex) {
+                    this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN);
+                    LOG.error(String.format("Error while setting scheduling period from DefaultSchedule annotation: %s", ex.getMessage()), ex);
+                }
+                if (!triggeredSerially) {
+                    try {
+                        setMaxConcurrentTasks(dsc.concurrentTasks());
+                    } catch (Throwable ex) {
+                        LOG.error(String.format("Error while setting max concurrent tasks from DefaultSchedule annotation: %s", ex.getMessage()), ex);
+                    }
+                }
+            }
+        } catch (Throwable ex) {
+            LOG.error(String.format("Error while setting default schedule from DefaultSchedule annotation: %s",ex.getMessage()),ex);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/65b7b377/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java
new file mode 100644
index 0000000..b8e469b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+
+/**
+ * Dummy processor to test @DefaultSchedule annotation
+ */
+@DefaultSchedule(concurrentTasks = 5, strategy = SchedulingStrategy.CRON_DRIVEN, period = "0 0 0 1/1 * ?")
+public class DummyScheduledProcessor extends AbstractProcessor {
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/65b7b377/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java
new file mode 100644
index 0000000..34c16af
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * Dummy Processor to test @DefaultSettings annotation
+ */
+@DefaultSettings(yieldDuration = "5 sec", penaltyDuration = "1 min", bulletinLevel = LogLevel.DEBUG)
+public class DummySettingsProcessor extends AbstractProcessor {
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/65b7b377/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index bbcdc3b..32f8135 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -46,10 +46,12 @@ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.MockProvenanceRepository;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FileBasedVariableRegistry;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.After;
@@ -57,6 +59,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+
 public class TestFlowController {
 
     private FlowController controller;
@@ -327,4 +330,31 @@ public class TestFlowController {
         assertFalse(service.equals(serviceNode));
     }
 
+    @Test
+    public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException,ClassNotFoundException,InstantiationException,IllegalAccessException {
+        ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor");
+        assertEquals(5,p_scheduled.getMaxConcurrentTasks());
+        assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy());
+        assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod());
+        assertEquals("1 sec",p_scheduled.getYieldPeriod());
+        assertEquals("30 sec",p_scheduled.getPenalizationPeriod());
+        assertEquals(LogLevel.WARN,p_scheduled.getBulletinLevel());
+    }
+
+    @Test
+    public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException,ClassNotFoundException {
+
+        ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor");
+        assertEquals("5 sec",p_settings.getYieldPeriod());
+        assertEquals("1 min",p_settings.getPenalizationPeriod());
+        assertEquals(LogLevel.DEBUG,p_settings.getBulletinLevel());
+        assertEquals(1,p_settings.getMaxConcurrentTasks());
+        assertEquals(SchedulingStrategy.TIMER_DRIVEN,p_settings.getSchedulingStrategy());
+        assertEquals("0 sec",p_settings.getSchedulingPeriod());
+    }
+
+
+
+
+
 }