You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 21:35:30 UTC
[1/5] incubator-nifi git commit: NIFI-333 - catching ProcessException
instead of Exception when appropriate catching ProcessException in processors
instead of exception
Repository: incubator-nifi
Updated Branches:
refs/heads/develop 705ee852b -> d8954ab01
NIFI-333 - catching ProcessException instead of Exception
when appropriate catching ProcessException in processors instead of
exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0f8d00d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0f8d00d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0f8d00d5
Branch: refs/heads/develop
Commit: 0f8d00d5fff947647926570ccfa4b83ac9411e4b
Parents: b8ade5b
Author: danbress <db...@onyxconsults.com>
Authored: Sat Feb 14 12:20:18 2015 -0500
Committer: danbress <db...@onyxconsults.com>
Committed: Sat Feb 14 12:28:33 2015 -0500
----------------------------------------------------------------------
.../hadoop/CreateHadoopSequenceFile.java | 4 +--
.../standard/Base64EncodeContent.java | 13 +++++-----
.../processors/standard/CompressContent.java | 27 ++++++++++----------
.../nifi/processors/standard/HashContent.java | 14 +++++-----
.../nifi/processors/standard/PutEmail.java | 5 ++--
.../nifi/processors/standard/TransformXml.java | 3 ++-
6 files changed, 35 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0f8d00d5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index 1422a7b..a031923 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -33,7 +34,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
/**
* <p>
@@ -167,7 +167,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType);
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
- } catch (Exception e) {
+ } catch (ProcessException e) {
getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", new Object[]{flowFile}, e);
session.transfer(flowFile, RELATIONSHIP_FAILURE);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0f8d00d5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index d9175e0..cd272ff 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -28,6 +28,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.codec.binary.Base64OutputStream;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@@ -36,11 +41,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
import org.apache.nifi.util.StopWatch;
@@ -136,7 +137,7 @@ public class Base64EncodeContent extends AbstractProcessor {
logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
- } catch (Exception e) {
+ } catch (ProcessException e) {
logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0f8d00d5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index cf20f16..e631cd0 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -32,30 +32,31 @@ import java.util.concurrent.TimeUnit;
import lzma.sdk.lzma.Decoder;
import lzma.streams.LzmaInputStream;
import lzma.streams.LzmaOutputStream;
+
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
-
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.tukaani.xz.LZMA2Options;
import org.tukaani.xz.XZInputStream;
import org.tukaani.xz.XZOutputStream;
@@ -290,7 +291,7 @@ public class CompressContent extends AbstractProcessor {
compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final Exception e) {
+ } catch (final ProcessException e) {
logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, e});
session.transfer(flowFile, REL_FAILURE);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0f8d00d5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
index 827653b..9f8a16c 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
@@ -28,23 +28,23 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ObjectHolder;
@EventDriven
@@ -143,7 +143,7 @@ public class HashContent extends AbstractProcessor {
logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
- } catch (final Exception e) {
+ } catch (final ProcessException e) {
logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0f8d00d5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
index 2fa71c8..eb6b1cc 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
@@ -32,6 +32,7 @@ import java.util.Set;
import javax.activation.DataHandler;
import javax.mail.Message;
import javax.mail.Message.RecipientType;
+import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.URLName;
import javax.mail.internet.AddressException;
@@ -56,9 +57,9 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-
import org.apache.commons.codec.binary.Base64;
import com.sun.mail.smtp.SMTPTransport;
@@ -263,7 +264,7 @@ public class PutEmail extends AbstractProcessor {
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Sent email as a result of receiving {}", new Object[]{flowFile});
- } catch (final Exception e) {
+ } catch (final ProcessException | MessagingException | IOException e) {
context.yield();
logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0f8d00d5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
index 5e251f6..8a2feb8 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
@@ -51,6 +51,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@@ -152,7 +153,7 @@ public class TransformXml extends AbstractProcessor {
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original});
- } catch (Exception e) {
+ } catch (ProcessException e) {
logger.error("Unable to transform {} due to {}", new Object[]{original, e});
session.transfer(original, REL_FAILURE);
}
[5/5] incubator-nifi git commit: Merge branch 'NIFI-333' of
https://github.com/danbress/incubator-nifi into develop
Posted by ma...@apache.org.
Merge branch 'NIFI-333' of https://github.com/danbress/incubator-nifi into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d8954ab0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d8954ab0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d8954ab0
Branch: refs/heads/develop
Commit: d8954ab0137b0375dce2be8b65adbe721fc74f1b
Parents: 705ee85 c8810c0
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 23 15:34:13 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 23 15:34:13 2015 -0500
----------------------------------------------------------------------
.../hadoop/CreateHadoopSequenceFile.java | 4 +-
.../standard/Base64EncodeContent.java | 13 ++-
.../processors/standard/CompressContent.java | 27 ++---
.../nifi/processors/standard/HashContent.java | 14 +--
.../nifi/processors/standard/PutEmail.java | 5 +-
.../processors/standard/SegmentContent.java | 114 +++++++++----------
.../nifi/processors/standard/TransformXml.java | 3 +-
.../standard/TestCompressContent.java | 18 +++
.../nifi/processors/standard/TestPutEmail.java | 45 ++++++++
9 files changed, 152 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
[4/5] incubator-nifi git commit: NIFI-333 - Removing exception
handling in SegmentContent From Mark - 'Theres no exception that could get
thrown in there unless theres something weird - in which case the framework
should catch it and handle it'
Posted by ma...@apache.org.
NIFI-333 - Removing exception handling in SegmentContent
>From Mark - 'Theres no exception that could get thrown in there unless
theres something weird - in which case the framework should catch it and
handle it'
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c8810c04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c8810c04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c8810c04
Branch: refs/heads/develop
Commit: c8810c04d8dcb6823c7e59e85158a0c3cb18675f
Parents: 0fa1b16
Author: danbress <db...@onyxconsults.com>
Authored: Tue Feb 17 11:17:46 2015 -0500
Committer: danbress <db...@onyxconsults.com>
Committed: Tue Feb 17 11:17:46 2015 -0500
----------------------------------------------------------------------
.../processors/standard/SegmentContent.java | 114 +++++++++----------
1 file changed, 54 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c8810c04/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
index dfdd401..cf0539e 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
@@ -25,6 +25,11 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -34,12 +39,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@@ -102,62 +101,57 @@ public class SegmentContent extends AbstractProcessor {
return;
}
- try {
- final String segmentId = UUID.randomUUID().toString();
- final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
-
- final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
-
- if (flowFile.getSize() <= segmentSize) {
- flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
- flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
- flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
- flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
-
- flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
- flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
- flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
-
- FlowFile clone = session.clone(flowFile);
- session.transfer(flowFile, REL_ORIGINAL);
- session.transfer(clone, REL_SEGMENTS);
- return;
- }
-
- int totalSegments = (int) (flowFile.getSize() / segmentSize);
- if (totalSegments * segmentSize < flowFile.getSize()) {
- totalSegments++;
- }
-
- final Map<String, String> segmentAttributes = new HashMap<>();
- segmentAttributes.put(SEGMENT_ID, segmentId);
- segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
- segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
-
- segmentAttributes.put(FRAGMENT_ID, segmentId);
- segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments));
-
- final Set<FlowFile> segmentSet = new HashSet<>();
- for (int i = 1; i <= totalSegments; i++) {
- final long segmentOffset = segmentSize * (i - 1);
- FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
- segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
- segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
- segment = session.putAllAttributes(segment, segmentAttributes);
- segmentSet.add(segment);
- }
-
- session.transfer(segmentSet, REL_SEGMENTS);
+ final String segmentId = UUID.randomUUID().toString();
+ final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
+
+ final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+ if (flowFile.getSize() <= segmentSize) {
+ flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
+ flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
+ flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
+ flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
+
+ flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
+ flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
+ flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
+
+ FlowFile clone = session.clone(flowFile);
session.transfer(flowFile, REL_ORIGINAL);
+ session.transfer(clone, REL_SEGMENTS);
+ return;
+ }
- if (totalSegments <= 10) {
- getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet});
- } else {
- getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments});
- }
- } catch (final Exception e) {
- throw new ProcessException(e);
+ int totalSegments = (int) (flowFile.getSize() / segmentSize);
+ if (totalSegments * segmentSize < flowFile.getSize()) {
+ totalSegments++;
}
- }
+ final Map<String, String> segmentAttributes = new HashMap<>();
+ segmentAttributes.put(SEGMENT_ID, segmentId);
+ segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
+ segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
+
+ segmentAttributes.put(FRAGMENT_ID, segmentId);
+ segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments));
+
+ final Set<FlowFile> segmentSet = new HashSet<>();
+ for (int i = 1; i <= totalSegments; i++) {
+ final long segmentOffset = segmentSize * (i - 1);
+ FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
+ segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
+ segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
+ segment = session.putAllAttributes(segment, segmentAttributes);
+ segmentSet.add(segment);
+ }
+
+ session.transfer(segmentSet, REL_SEGMENTS);
+ session.transfer(flowFile, REL_ORIGINAL);
+
+ if (totalSegments <= 10) {
+ getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet});
+ } else {
+ getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments});
+ }
+ }
}
[3/5] incubator-nifi git commit: NIFI-333 - Adding failure test for
PutEmail
Posted by ma...@apache.org.
NIFI-333 - Adding failure test for PutEmail
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0fa1b16c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0fa1b16c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0fa1b16c
Branch: refs/heads/develop
Commit: 0fa1b16c831c413a2e31e768398da0602ca3758d
Parents: 361ac1f
Author: danbress <db...@onyxconsults.com>
Authored: Sun Feb 15 09:24:51 2015 -0500
Committer: danbress <db...@onyxconsults.com>
Committed: Sun Feb 15 09:24:51 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/TestPutEmail.java | 45 ++++++++++++++++++++
1 file changed, 45 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0fa1b16c/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java
new file mode 100644
index 0000000..b737ed6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestPutEmail {
+
+ @Test
+ public void testHotNotFound() {
+ // verifies that files are routed to failure when the SMTP host doesn't exist
+ final TestRunner runner = TestRunners.newTestRunner(new PutEmail());
+ runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123");
+ runner.setProperty(PutEmail.FROM, "test@apache.org");
+ runner.setProperty(PutEmail.TO, "test@apache.org");
+ runner.setProperty(PutEmail.MESSAGE, "Message Body");
+
+ final Map<String, String> attributes = new HashMap<>();
+ runner.enqueue("Some Text".getBytes(), attributes);
+
+ runner.run();
+
+ runner.assertQueueEmpty();
+ runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
+ }
+}
[2/5] incubator-nifi git commit: NIFI-333 - Adding failure test for
DecompressContent
Posted by ma...@apache.org.
NIFI-333 - Adding failure test for DecompressContent
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/361ac1f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/361ac1f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/361ac1f1
Branch: refs/heads/develop
Commit: 361ac1f1e5c8ee943220b337c92cd49616a02194
Parents: 0f8d00d
Author: danbress <db...@onyxconsults.com>
Authored: Sun Feb 15 09:08:22 2015 -0500
Committer: danbress <db...@onyxconsults.com>
Committed: Sun Feb 15 09:08:22 2015 -0500
----------------------------------------------------------------------
.../processors/standard/TestCompressContent.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/361ac1f1/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 71c8583..df1d506 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -109,4 +109,22 @@ public class TestCompressContent {
flowFile.assertAttributeEquals("filename", "SampleFile.txt.gz");
}
+
+ @Test
+ public void testDecompressFailure() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE, "decompress");
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT, "gzip");
+
+ byte[] data = new byte[]{1,2,3,4,5,6,7,8,9,10};
+ runner.enqueue(data);
+
+
+ assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
+ runner.run();
+ runner.assertQueueEmpty();
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_FAILURE, 1);
+
+ runner.getFlowFilesForRelationship(CompressContent.REL_FAILURE).get(0).assertContentEquals(data);
+ }
}