You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2014/12/11 19:41:18 UTC
[16/25] incubator-nifi git commit: NIFI-12: Remove Processors even if
their @OnRemoved methods throw Exceptions
NIFI-12: Remove Processors even if their @OnRemoved methods throw Exceptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cb2e855f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cb2e855f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cb2e855f
Branch: refs/heads/bootstrap
Commit: cb2e855fc7c42536887b055baadf93683d764a47
Parents: f60a97b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Dec 10 12:49:31 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Dec 10 12:49:31 2014 -0500
----------------------------------------------------------------------
.../nifi/groups/StandardProcessGroup.java | 4 ++--
.../nifi/processors/standard/MergeContent.java | 22 +++++++++++++-------
2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb2e855f/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 8aafb58..1064536 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -46,6 +46,7 @@ import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.annotation.OnRemoved;
import org.apache.nifi.processor.annotation.OnShutdown;
@@ -53,7 +54,6 @@ import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -664,7 +664,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor);
- ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
} catch (final Exception e) {
throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb2e855f/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index d443e00..9a932f0 100644
--- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -76,7 +76,6 @@ import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2;
import org.apache.nifi.util.FlowFilePackagerV3;
import org.apache.nifi.util.ObjectHolder;
-
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
@@ -317,6 +316,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
return Files.readAllBytes(Paths.get(filename));
}
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
int binsAdded = binFlowFiles(context, sessionFactory);
@@ -331,6 +331,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
context.yield();
}
}
+
private int migrateBins(final ProcessContext context) {
int added = 0;
@@ -548,20 +549,27 @@ public class MergeContent extends AbstractSessionFactoryProcessor {
public void onScheduled(final ProcessContext context) throws IOException {
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
- if (context.getProperty(MAX_BIN_AGE).getValue() != null) {
+ if (context.getProperty(MAX_BIN_AGE).isSet() ) {
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
+ } else {
+ binManager.setMaxBinAge(Integer.MAX_VALUE);
}
-
- if (context.getProperty(MAX_SIZE).getValue() != null) {
+
+ if ( context.getProperty(MAX_SIZE).isSet() ) {
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
+ } else {
+ binManager.setMaximumSize(Long.MAX_VALUE);
}
-
+
if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
} else {
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
- if (context.getProperty(MAX_ENTRIES).getValue() != null) {
- binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger());
+
+ if ( context.getProperty(MAX_ENTRIES).isSet() ) {
+ binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
+ } else {
+ binManager.setMaximumEntries(Integer.MAX_VALUE);
}
}