You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2014/12/11 19:41:18 UTC

[16/25] incubator-nifi git commit: NIFI-12: Remove Processors even if their @OnRemoved methods throw Exceptions

NIFI-12: Remove Processors even if their @OnRemoved methods throw Exceptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cb2e855f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cb2e855f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cb2e855f

Branch: refs/heads/bootstrap
Commit: cb2e855fc7c42536887b055baadf93683d764a47
Parents: f60a97b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Dec 10 12:49:31 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Dec 10 12:49:31 2014 -0500

----------------------------------------------------------------------
 .../nifi/groups/StandardProcessGroup.java       |  4 ++--
 .../nifi/processors/standard/MergeContent.java  | 22 +++++++++++++-------
 2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb2e855f/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 8aafb58..1064536 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -46,6 +46,7 @@ import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.processor.annotation.OnRemoved;
 import org.apache.nifi.processor.annotation.OnShutdown;
@@ -53,7 +54,6 @@ import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -664,7 +664,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             try (final NarCloseable x = NarCloseable.withNarLoader()) {
                 final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor);
-                ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
+                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
             } catch (final Exception e) {
                 throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb2e855f/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index d443e00..9a932f0 100644
--- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -76,7 +76,6 @@ import org.apache.nifi.util.FlowFilePackagerV1;
 import org.apache.nifi.util.FlowFilePackagerV2;
 import org.apache.nifi.util.FlowFilePackagerV3;
 import org.apache.nifi.util.ObjectHolder;
-
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 
@@ -317,6 +316,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
         return Files.readAllBytes(Paths.get(filename));
     }
 
+    
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
         int binsAdded = binFlowFiles(context, sessionFactory);
@@ -331,6 +331,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
             context.yield();
         }
     }
+    
 
     private int migrateBins(final ProcessContext context) {
         int added = 0;
@@ -548,20 +549,27 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
     public void onScheduled(final ProcessContext context) throws IOException {
         binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
 
-        if (context.getProperty(MAX_BIN_AGE).getValue() != null) {
+        if (context.getProperty(MAX_BIN_AGE).isSet() ) {
             binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
+        } else {
+            binManager.setMaxBinAge(Integer.MAX_VALUE);
         }
-
-        if (context.getProperty(MAX_SIZE).getValue() != null) {
+        
+        if ( context.getProperty(MAX_SIZE).isSet() ) {
             binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
+        } else {
+            binManager.setMaximumSize(Long.MAX_VALUE);
         }
-
+        
         if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
             binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
         } else {
             binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
-            if (context.getProperty(MAX_ENTRIES).getValue() != null) {
-                binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger());
+
+            if ( context.getProperty(MAX_ENTRIES).isSet() ) {
+                binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
+            } else {
+                binManager.setMaximumEntries(Integer.MAX_VALUE);
             }
         }