You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/26 15:19:44 UTC

[20/48] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index 0000000,8520a55..2a81b64
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@@ -1,0 -1,370 +1,370 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.ByteArrayOutputStream;
+ import org.apache.nifi.stream.io.ByteCountingInputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.IntegerHolder;
+ import org.apache.nifi.util.ObjectHolder;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.UUID;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"split", "text"})
+ @CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines")
+ public class SplitText extends AbstractProcessor {
+ 
+     // attribute keys
+     public static final String SPLIT_LINE_COUNT = "text.line.count";
+     public static final String FRAGMENT_ID = "fragment.identifier";
+     public static final String FRAGMENT_INDEX = "fragment.index";
+     public static final String FRAGMENT_COUNT = "fragment.count";
+     public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ 
+     public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder()
+             .name("Line Split Count")
+             .description("The number of lines that will be added to each split file")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor HEADER_LINE_COUNT = new PropertyDescriptor.Builder()
+             .name("Header Line Count")
+             .description("The number of lines that should be considered part of the header; the header lines will be duplicated to all split files")
+             .required(true)
+             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+             .defaultValue("0")
+             .build();
+     public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder()
+             .name("Remove Trailing Newlines")
+             .description(
+                     "Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later")
+             .required(true)
+             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original input file will be routed to this destination when it has been successfully split into 1 or more files").build();
+     public static final Relationship REL_SPLITS = new Relationship.Builder().name("splits").description("The split files will be routed to this destination when an input file is successfully split into 1 or more split files").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(LINE_SPLIT_COUNT);
+         properties.add(HEADER_LINE_COUNT);
+         properties.add(REMOVE_TRAILING_NEWLINES);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_ORIGINAL);
+         relationships.add(REL_SPLITS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     /**
+      * Reads up to the given maximum number of lines, copying them to out
+      *
+      * @param in
+      * @param maxNumLines
+      * @param out
+      * @return the number of lines actually copied
+      * @throws IOException
+      */
+     private int readLines(final InputStream in, final int maxNumLines, final OutputStream out, final boolean keepAllNewLines) throws IOException {
+         int numLines = 0;
+         for (int i = 0; i < maxNumLines; i++) {
+             final long bytes = countBytesToSplitPoint(in, out, keepAllNewLines || (i != maxNumLines - 1));
+             if (bytes <= 0) {
+                 return numLines;
+             }
+ 
+             numLines++;
+         }
+ 
+         return numLines;
+     }
+ 
+     private long countBytesToSplitPoint(final InputStream in, final OutputStream out, final boolean includeLineDelimiter) throws IOException {
+         int lastByte = -1;
+         long bytesRead = 0L;
+ 
+         while (true) {
+             in.mark(1);
+             final int nextByte = in.read();
+ 
+             // if we hit end of stream or new line we're done
+             if (nextByte == -1) {
+                 if (lastByte == '\r') {
+                     return includeLineDelimiter ? bytesRead : bytesRead - 1;
+                 } else {
+                     return bytesRead;
+                 }
+             }
+ 
+             // if there's an OutputStream to copy the data to, copy it, if appropriate.
+             // "if appropriate" means that it's not a line delimiter or that we want to copy line delimiters
+             bytesRead++;
+             if (out != null && (includeLineDelimiter || (nextByte != '\n' && nextByte != '\r'))) {
+                 out.write(nextByte);
+             }
+ 
+             // if we have a new line, then we're done
+             if (nextByte == '\n') {
+                 if (includeLineDelimiter) {
+                     return bytesRead;
+                 } else {
+                     return (lastByte == '\r') ? bytesRead - 2 : bytesRead - 1;
+                 }
+             }
+ 
+             // we didn't get a new line but if last byte was carriage return we've reached a new-line.
+             // so we roll back the last byte that we read and return
+             if (lastByte == '\r') {
+                 in.reset();
+                 bytesRead--;    // we reset the stream by 1 byte so decrement the number of bytes read by 1
+                 return includeLineDelimiter ? bytesRead : bytesRead - 1;
+             }
+ 
+             // keep track of what the last byte was that we read so that we can detect \r followed by some other
+             // character.
+             lastByte = nextByte;
+         }
+     }
+ 
+     private SplitInfo countBytesToSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines) throws IOException {
+         SplitInfo info = new SplitInfo();
+ 
+         while (info.lengthLines < numLines) {
+             final long bytesTillNext = countBytesToSplitPoint(in, null, keepAllNewLines || (info.lengthLines != numLines - 1));
+             if (bytesTillNext <= 0L) {
+                 break;
+             }
+ 
+             info.lengthLines++;
+             info.lengthBytes += bytesTillNext;
+         }
+ 
+         return info;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
+         final int splitCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
+         final boolean removeTrailingNewlines = context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
+ 
+         final ObjectHolder<String> errorMessage = new ObjectHolder<>(null);
+         final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
+ 
+         final long startNanos = System.nanoTime();
+         final List<FlowFile> splits = new ArrayList<>();
+         session.read(flowFile, new InputStreamCallback() {
+             @Override
+             public void process(final InputStream rawIn) throws IOException {
+                 try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
+                         final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
+ 
+                     // if we have header lines, copy them into a ByteArrayOutputStream
+                     final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
+                     final int headerLinesCopied = readLines(in, headerCount, headerStream, true);
+                     if (headerLinesCopied < headerCount) {
+                         errorMessage.set("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines");
+                         return;
+                     }
+ 
+                     while (true) {
+                         if (headerCount > 0) {
+                             // if we have header lines, create a new FlowFile, copy the header lines to that file,
+                             // and then start copying lines
+                             final IntegerHolder linesCopied = new IntegerHolder(0);
+                             FlowFile splitFile = session.create(flowFile);
+                             try {
+                                 splitFile = session.write(splitFile, new OutputStreamCallback() {
+                                     @Override
+                                     public void process(final OutputStream rawOut) throws IOException {
+                                         try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
+                                             headerStream.writeTo(out);
+                                             linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines));
+                                         }
+                                     }
+                                 });
+                                 splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
+                                 logger.debug("Created Split File {} with {} lines", new Object[]{splitFile, linesCopied.get()});
+                             } finally {
+                                 if (linesCopied.get() > 0) {
+                                     splits.add(splitFile);
+                                 } else {
+                                     // if the number of content lines is a multiple of the SPLIT_LINE_COUNT, 
+                                     // the last flow file will contain just a header; don't forward that one
+                                     session.remove(splitFile);
+                                 }
+                             }
+ 
+                             // If we copied fewer lines than what we want, then we're done copying data (we've hit EOF).
+                             if (linesCopied.get() < splitCount) {
+                                 break;
+                             }
+                         } else {
+                             // We have no header lines, so we can simply demarcate the original File via the
+                             // ProcessSession#clone method.
+                             long beforeReadingLines = in.getBytesConsumed();
+                             final SplitInfo info = countBytesToSplitPoint(in, splitCount, !removeTrailingNewlines);
+                             if (info.lengthBytes == 0) {
+                                 // stream is out of data
+                                 break;
+                             } else {
+                                 info.offsetBytes = beforeReadingLines;
+                                 splitInfos.add(info);
+                                 final long procNanos = System.nanoTime() - startNanos;
+                                 final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
+                                 logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; total splits = {}; total processing time = {} ms", new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
+                             }
+                         }
+                     }
+                 }
+             }
+         });
+ 
+         if (errorMessage.get() != null) {
+             logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
+             session.transfer(flowFile, REL_FAILURE);
+             if (splits != null && !splits.isEmpty()) {
+                 session.remove(splits);
+             }
+             return;
+         }
+ 
+         if (!splitInfos.isEmpty()) {
+             // Create the splits
+             for (final SplitInfo info : splitInfos) {
+                 FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
+                 split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
+                 splits.add(split);
+             }
+         }
+         finishFragmentAttributes(session, flowFile, splits);
+ 
+         if (splits.size() > 10) {
+             logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
+         } else {
+             logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
+         }
+ 
+         session.transfer(flowFile, REL_ORIGINAL);
+         session.transfer(splits, REL_SPLITS);
+     }
+ 
+     /**
+      * Apply split index, count and other attributes.
+      *
+      * @param session
+      * @param source
+      * @param unpacked
+      */
+     private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
+         final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
+ 
+         final String fragmentId = UUID.randomUUID().toString();
+         final ArrayList<FlowFile> newList = new ArrayList<>(splits);
+         splits.clear();
+         for (int i = 1; i <= newList.size(); i++) {
+             FlowFile ff = newList.get(i - 1);
+             final Map<String, String> attributes = new HashMap<>();
+             attributes.put(FRAGMENT_ID, fragmentId);
+             attributes.put(FRAGMENT_INDEX, String.valueOf(i));
+             attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
+             attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
+             FlowFile newFF = session.putAllAttributes(ff, attributes);
+             splits.add(newFF);
+         }
+     }
+ 
+     private class SplitInfo {
+ 
+         public long offsetBytes;
+         public long lengthBytes;
+         public long lengthLines;
+ 
+         public SplitInfo() {
+             super();
+             this.offsetBytes = 0L;
+             this.lengthBytes = 0L;
+             this.lengthLines = 0L;
+         }
+ 
+         @SuppressWarnings("unused")
+         public SplitInfo(long offsetBytes, long lengthBytes, long lengthLines) {
+             super();
+             this.offsetBytes = offsetBytes;
+             this.lengthBytes = lengthBytes;
+             this.lengthLines = lengthLines;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
index 0000000,c5eda3d..1220b7c
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
@@@ -1,0 -1,300 +1,300 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ 
+ import javax.xml.parsers.ParserConfigurationException;
+ import javax.xml.parsers.SAXParser;
+ import javax.xml.parsers.SAXParserFactory;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.util.XmlElementNotifier;
+ import org.apache.nifi.util.BooleanHolder;
+ 
+ import org.apache.commons.lang3.StringEscapeUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.xml.sax.Attributes;
+ import org.xml.sax.ContentHandler;
+ import org.xml.sax.InputSource;
+ import org.xml.sax.Locator;
+ import org.xml.sax.SAXException;
+ import org.xml.sax.XMLReader;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"xml", "split"})
+ @CapabilityDescription("Splits an XML File into multiple separate FlowFiles, each comprising a child or descendant of the original root element")
+ public class SplitXml extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor SPLIT_DEPTH = new PropertyDescriptor.Builder()
+             .name("Split Depth")
+             .description("Indicates the XML-nesting depth to start splitting XML fragments. A depth of 1 means split the root's children, whereas a depth of 2 means split the root's children's children and so forth.")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .defaultValue("1")
+             .build();
+ 
+     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build();
+     public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All segments of the original FlowFile will be routed to this relationship").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     private static final String FEATURE_PREFIX = "http://xml.org/sax/features/";
+     public static final String ENABLE_NAMESPACES_FEATURE = FEATURE_PREFIX + "namespaces";
+     public static final String ENABLE_NAMESPACE_PREFIXES_FEATURE = FEATURE_PREFIX + "namespace-prefixes";
+     private static final SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
+ 
+     static {
+         saxParserFactory.setNamespaceAware(true);
+         try {
+             saxParserFactory.setFeature(ENABLE_NAMESPACES_FEATURE, true);
+             saxParserFactory.setFeature(ENABLE_NAMESPACE_PREFIXES_FEATURE, true);
+         } catch (Exception e) {
+             final Logger staticLogger = LoggerFactory.getLogger(SplitXml.class);
+             staticLogger.warn("Unable to configure SAX Parser to make namespaces available", e);
+         }
+     }
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(SPLIT_DEPTH);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_ORIGINAL);
+         relationships.add(REL_SPLIT);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final FlowFile original = session.get();
+         if (original == null) {
+             return;
+         }
+ 
+         final int depth = context.getProperty(SPLIT_DEPTH).asInteger();
+         final ProcessorLog logger = getLogger();
+ 
+         final List<FlowFile> splits = new ArrayList<>();
+         final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(new XmlElementNotifier() {
+             @Override
+             public void onXmlElementFound(final String xmlTree) {
+                 FlowFile split = session.create(original);
+                 split = session.write(split, new OutputStreamCallback() {
+                     @Override
+                     public void process(final OutputStream out) throws IOException {
+                         out.write(xmlTree.getBytes("UTF-8"));
+                     }
+                 });
+                 splits.add(split);
+             }
+         }, depth);
+ 
+         final BooleanHolder failed = new BooleanHolder(false);
+         session.read(original, new InputStreamCallback() {
+             @Override
+             public void process(final InputStream rawIn) throws IOException {
+                 try (final InputStream in = new BufferedInputStream(rawIn)) {
+                     SAXParser saxParser = null;
+                     try {
+                         saxParser = saxParserFactory.newSAXParser();
+                         final XMLReader reader = saxParser.getXMLReader();
+                         reader.setContentHandler(parser);
+                         reader.parse(new InputSource(in));
+                     } catch (final ParserConfigurationException | SAXException e) {
+                         logger.error("Unable to parse {} due to {}", new Object[]{original, e});
+                         failed.set(true);
+                     }
+                 }
+             }
+         });
+ 
+         if (failed.get()) {
+             session.transfer(original, REL_FAILURE);
+             session.remove(splits);
+         } else {
+             session.transfer(splits, REL_SPLIT);
+             session.transfer(original, REL_ORIGINAL);
+             logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()});
+         }
+     }
+ 
+     private static class XmlSplitterSaxParser implements ContentHandler {
+ 
+         private static final String XML_PROLOGUE = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
+         private final XmlElementNotifier notifier;
+         private final int splitDepth;
+         private final StringBuilder sb = new StringBuilder(XML_PROLOGUE);
+         private int depth = 0;
+ 
+         public XmlSplitterSaxParser(XmlElementNotifier notifier, int splitDepth) {
+             this.notifier = notifier;
+             this.splitDepth = splitDepth;
+         }
+ 
+         @Override
+         public void characters(char[] ch, int start, int length) throws SAXException {
+             // if we're not at a level where we care about capturing text, then return
+             if (depth <= splitDepth) {
+                 return;
+             }
+ 
+             // capture text
+             for (int i = start; i < start + length; i++) {
+                 char c = ch[i];
+                 switch (c) {
+                     case '<':
+                         sb.append("&lt;");
+                         break;
+                     case '>':
+                         sb.append("&gt;");
+                         break;
+                     case '&':
+                         sb.append("&amp;");
+                         break;
+                     case '\'':
+                         sb.append("&apos;");
+                         break;
+                     case '"':
+                         sb.append("&quot;");
+                         break;
+                     default:
+                         sb.append(c);
+                         break;
+                 }
+             }
+         }
+ 
+         @Override
+         public void endDocument() throws SAXException {
+         }
+ 
+         @Override
+         public void endElement(String uri, String localName, String qName) throws SAXException {
+             // We have finished processing this element. Decrement the depth.
+             int newDepth = --depth;
+ 
+             // if we're at a level where we care about capturing text, then add the closing element
+             if (newDepth >= splitDepth) {
+                 // Add the element end tag.
+                 sb.append("</").append(qName).append(">");
+             }
+ 
+             // If we have now returned to level 1, we have finished processing
+             // a 2nd-level element. Send notification with the XML text and
+             // erase the String Builder so that we can start
+             // processing a new 2nd-level element.
+             if (newDepth == splitDepth) {
+                 String elementTree = sb.toString();
+                 notifier.onXmlElementFound(elementTree);
+                 // Reset the StringBuilder to just the XML prolog.
+                 sb.setLength(XML_PROLOGUE.length());
+             }
+         }
+ 
+         @Override
+         public void endPrefixMapping(String prefix) throws SAXException {
+         }
+ 
+         @Override
+         public void ignorableWhitespace(char[] ch, int start, int length) throws SAXException {
+         }
+ 
+         @Override
+         public void processingInstruction(String target, String data) throws SAXException {
+         }
+ 
+         @Override
+         public void setDocumentLocator(Locator locator) {
+         }
+ 
+         @Override
+         public void skippedEntity(String name) throws SAXException {
+         }
+ 
+         @Override
+         public void startDocument() throws SAXException {
+         }
+ 
+         @Override
+         public void startElement(final String uri, final String localName, final String qName, final Attributes atts) throws SAXException {
+             // Increment the current depth because start a new XML element.
+             int newDepth = ++depth;
+             // Output the element and its attributes if it is
+             // not the root element.
+             if (newDepth > splitDepth) {
+                 sb.append("<");
+                 sb.append(qName);
+ 
+                 int attCount = atts.getLength();
+                 for (int i = 0; i < attCount; i++) {
+                     String attName = atts.getQName(i);
+                     String attValue = StringEscapeUtils.escapeXml10(atts.getValue(i));
+                     sb.append(" ").append(attName).append("=").append("\"").append(attValue).append("\"");
+                 }
+ 
+                 sb.append(">");
+             }
+         }
+ 
+         @Override
+         public void startPrefixMapping(String prefix, String uri) throws SAXException {
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
index 0000000,7385918..5e251f6
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
@@@ -1,0 -1,194 +1,194 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
+ import javax.xml.transform.Transformer;
+ import javax.xml.transform.TransformerFactory;
+ import javax.xml.transform.stream.StreamResult;
+ import javax.xml.transform.stream.StreamSource;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.expression.AttributeExpression;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ import org.apache.nifi.util.Tuple;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"xml", "xslt", "transform"})
+ @CapabilityDescription("Applies the provided XSLT file to the flowfile XML payload. A new FlowFile is created "
+         + "with transformed content and is routed to the 'success' relationship. If the XSL transform "
+         + "fails, the original FlowFile is routed to the 'failure' relationship")
+ public class TransformXml extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor XSLT_FILE_NAME = new PropertyDescriptor.Builder()
+             .name("XSLT file name")
+             .description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content.")
+             .required(true)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile with transformed content will be routed to this relationship").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(XSLT_FILE_NAME);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+         return new PropertyDescriptor.Builder()
+                 .name(propertyDescriptorName)
+                 .expressionLanguageSupported(true)
+                 .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+                 .required(false)
+                 .dynamic(true)
+                 .build();
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final FlowFile original = session.get();
+         if (original == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final StopWatch stopWatch = new StopWatch(true);
+ 
+         try {
+             FlowFile transformed = session.write(original, new StreamCallback() {
+                 @Override
+                 public void process(final InputStream rawIn, final OutputStream out) throws IOException {
+                     try (final InputStream in = new BufferedInputStream(rawIn)) {
+ 
+                         File stylesheet = new File(context.getProperty(XSLT_FILE_NAME).getValue());
+                         StreamSource styleSource = new StreamSource(stylesheet);
+                         TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl();
+                         Transformer transformer = tfactory.newTransformer(styleSource);
+ 
+                         // pass all dynamic properties to the transformer
+                         for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+                             if (entry.getKey().isDynamic()) {
+                                 String value = context.newPropertyValue(entry.getValue()).evaluateAttributeExpressions(original).getValue();
+                                 transformer.setParameter(entry.getKey().getName(), value);
+                             }
+                         }
+ 
+                         // use a StreamSource with Saxon
+                         StreamSource source = new StreamSource(in);
+                         StreamResult result = new StreamResult(out);
+                         transformer.transform(source, result);
+                     } catch (final Exception e) {
+                         throw new IOException(e);
+                     }
+                 }
+             });
+             session.transfer(transformed, REL_SUCCESS);
+             session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             logger.info("Transformed {}", new Object[]{original});
+         } catch (Exception e) {
+             logger.error("Unable to transform {} due to {}", new Object[]{original, e});
+             session.transfer(original, REL_FAILURE);
+         }
+     }
+ 
+     @SuppressWarnings("unused")
+     private static final class XsltValidator implements Validator {
+ 
+         private volatile Tuple<String, ValidationResult> cachedResult;
+ 
+         @Override
+         public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
+             final Tuple<String, ValidationResult> lastResult = this.cachedResult;
+             if (lastResult != null && lastResult.getKey().equals(input)) {
+                 return lastResult.getValue();
+             } else {
+                 String error = null;
+                 final File stylesheet = new File(input);
+                 final TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl();
+                 final StreamSource styleSource = new StreamSource(stylesheet);
+ 
+                 try {
+                     tfactory.newTransformer(styleSource);
+                 } catch (final Exception e) {
+                     error = e.toString();
+                 }
+ 
+                 this.cachedResult = new Tuple<>(input,
+                         new ValidationResult.Builder()
+                         .input(input)
+                         .subject(subject)
+                         .valid(error == null)
+                         .explanation(error).build());
+                 return this.cachedResult.getValue();
+             }
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 0000000,dc6daea..b30e780
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@@ -1,0 -1,427 +1,427 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Path;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.util.FlowFileUnpackager;
+ import org.apache.nifi.util.FlowFileUnpackagerV1;
+ import org.apache.nifi.util.FlowFileUnpackagerV2;
+ import org.apache.nifi.util.FlowFileUnpackagerV3;
+ import org.apache.nifi.util.ObjectHolder;
+ 
+ import org.apache.commons.compress.archivers.ArchiveEntry;
+ import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+ import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"Unpack", "un-merge", "tar", "zip", "archive", "flowfile-stream", "flowfile-stream-v3"})
+ @CapabilityDescription("Unpacks the content of FlowFiles that have been packaged with one of several different Packaging Formats, emitting one to many FlowFiles for each input FlowFile")
+ public class UnpackContent extends AbstractProcessor {
+ 
+     public static final String AUTO_DETECT_FORMAT = "use mime.type attribute";
+     public static final String TAR_FORMAT = "tar";
+     public static final String ZIP_FORMAT = "zip";
+     public static final String FLOWFILE_STREAM_FORMAT_V3 = "flowfile-stream-v3";
+     public static final String FLOWFILE_STREAM_FORMAT_V2 = "flowfile-stream-v2";
+     public static final String FLOWFILE_TAR_FORMAT = "flowfile-tar-v1";
+ 
+     // attribute keys
+     public static final String FRAGMENT_ID = "fragment.identifier";
+     public static final String FRAGMENT_INDEX = "fragment.index";
+     public static final String FRAGMENT_COUNT = "fragment.count";
+     public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ 
+     public static final String OCTET_STREAM = "application/octet-stream";
+ 
+     public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder()
+             .name("Packaging Format")
+             .description("The Packaging Format used to create the file")
+             .required(true)
+             .allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT)
+             .defaultValue(AUTO_DETECT_FORMAT)
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Unpacked FlowFiles are sent to this relationship").build();
+     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile is sent to this relationship after it has been successfully unpacked").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("The original FlowFile is sent to this relationship when it cannot be unpacked for some reason").build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_ORIGINAL);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(PACKAGING_FORMAT);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         String packagingFormat = context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase();
+         if (AUTO_DETECT_FORMAT.equals(packagingFormat)) {
+             final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+             if (mimeType == null) {
+                 logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile});
+                 session.transfer(flowFile, REL_FAILURE);
+                 return;
+             }
+ 
+             switch (mimeType.toLowerCase()) {
+                 case "application/tar":
+                     packagingFormat = TAR_FORMAT;
+                     break;
+                 case "application/zip":
+                     packagingFormat = ZIP_FORMAT;
+                     break;
+                 case "application/flowfile-v3":
+                     packagingFormat = FLOWFILE_STREAM_FORMAT_V3;
+                     break;
+                 case "application/flowfile-v2":
+                     packagingFormat = FLOWFILE_STREAM_FORMAT_V2;
+                     break;
+                 case "application/flowfile-v1":
+                     packagingFormat = FLOWFILE_TAR_FORMAT;
+                     break;
+                 default: {
+                     logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType});
+                     session.transfer(flowFile, REL_SUCCESS);
+                     return;
+                 }
+             }
+         }
+ 
+         final Unpacker unpacker;
+         final boolean addFragmentAttrs;
+         switch (packagingFormat) {
+             case TAR_FORMAT:
+                 unpacker = new TarUnpacker();
+                 addFragmentAttrs = true;
+                 break;
+             case ZIP_FORMAT:
+                 unpacker = new ZipUnpacker();
+                 addFragmentAttrs = true;
+                 break;
+             case FLOWFILE_STREAM_FORMAT_V2:
+                 unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
+                 addFragmentAttrs = false;
+                 break;
+             case FLOWFILE_STREAM_FORMAT_V3:
+                 unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
+                 addFragmentAttrs = false;
+                 break;
+             case FLOWFILE_TAR_FORMAT:
+                 unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
+                 addFragmentAttrs = false;
+                 break;
+             default:
+                 throw new AssertionError("Packaging Format was " + context.getProperty(PACKAGING_FORMAT).getValue());
+         }
+ 
+         final List<FlowFile> unpacked = new ArrayList<>();
+         try {
+             unpacker.unpack(session, flowFile, unpacked);
+             if (unpacked.isEmpty()) {
+                 logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile});
+                 session.transfer(flowFile, REL_FAILURE);
+                 return;
+             }
+ 
+             if (addFragmentAttrs) {
+                 finishFragmentAttributes(session, flowFile, unpacked);
+             }
+             session.transfer(unpacked, REL_SUCCESS);
+             session.transfer(flowFile, REL_ORIGINAL);
+             session.getProvenanceReporter().fork(flowFile, unpacked);
+             logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});
+         } catch (final ProcessException e) {
+             logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e});
+             session.transfer(flowFile, REL_FAILURE);
+             session.remove(unpacked);
+         }
+     }
+ 
+     private static interface Unpacker {
+ 
+         void unpack(ProcessSession session, FlowFile source, List<FlowFile> unpacked);
+     }
+ 
+     private static class TarUnpacker implements Unpacker {
+ 
+         @Override
+         public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
+             final String fragmentId = UUID.randomUUID().toString();
+             session.read(source, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream in) throws IOException {
+                     int fragmentCount = 0;
+                     try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(in))) {
+                         TarArchiveEntry tarEntry;
+                         while ((tarEntry = tarIn.getNextTarEntry()) != null) {
+                             if (tarEntry.isDirectory()) {
+                                 continue;
+                             }
+                             final File file = new File(tarEntry.getName());
+                             final Path filePath = file.toPath();
+                             final String filePathString = filePath.getParent() + "/";
+                             final Path absPath = filePath.toAbsolutePath();
+                             final String absPathString = absPath.getParent().toString() + "/";
+ 
+                             FlowFile unpackedFile = session.create(source);
+                             try {
+                                 final Map<String, String> attributes = new HashMap<>();
+                                 attributes.put(CoreAttributes.FILENAME.key(), file.getName());
+                                 attributes.put(CoreAttributes.PATH.key(), filePathString);
+                                 attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
+                                 attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+ 
+                                 attributes.put(FRAGMENT_ID, fragmentId);
+                                 attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
+ 
+                                 unpackedFile = session.putAllAttributes(unpackedFile, attributes);
+ 
+                                 final long fileSize = tarEntry.getSize();
+                                 unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
+                                     @Override
+                                     public void process(final OutputStream out) throws IOException {
+                                         StreamUtils.copy(tarIn, out, fileSize);
+                                     }
+                                 });
+                             } finally {
+                                 unpacked.add(unpackedFile);
+                             }
+                         }
+                     }
+                 }
+             });
+         }
+     }
+ 
+     private static class ZipUnpacker implements Unpacker {
+ 
+         @Override
+         public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
+             final String fragmentId = UUID.randomUUID().toString();
+             session.read(source, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream in) throws IOException {
+                     int fragmentCount = 0;
+                     try (final ZipArchiveInputStream zipIn = new ZipArchiveInputStream(new BufferedInputStream(in))) {
+                         ArchiveEntry zipEntry;
+                         while ((zipEntry = zipIn.getNextEntry()) != null) {
+                             if (zipEntry.isDirectory()) {
+                                 continue;
+                             }
+                             final File file = new File(zipEntry.getName());
+                             final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent();
+                             final Path absPath = file.toPath().toAbsolutePath();
+                             final String absPathString = absPath.getParent().toString() + "/";
+ 
+                             FlowFile unpackedFile = session.create(source);
+                             try {
+                                 final Map<String, String> attributes = new HashMap<>();
+                                 attributes.put(CoreAttributes.FILENAME.key(), file.getName());
+                                 attributes.put(CoreAttributes.PATH.key(), parentDirectory);
+                                 attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
+                                 attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+ 
+                                 attributes.put(FRAGMENT_ID, fragmentId);
+                                 attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
+ 
+                                 unpackedFile = session.putAllAttributes(unpackedFile, attributes);
+                                 unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
+                                     @Override
+                                     public void process(final OutputStream out) throws IOException {
+                                         StreamUtils.copy(zipIn, out);
+                                     }
+                                 });
+                             } finally {
+                                 unpacked.add(unpackedFile);
+                             }
+                         }
+                     }
+                 }
+             });
+         }
+     }
+ 
+     private static class FlowFileStreamUnpacker implements Unpacker {
+ 
+         private final FlowFileUnpackager unpackager;
+ 
+         public FlowFileStreamUnpacker(final FlowFileUnpackager unpackager) {
+             this.unpackager = unpackager;
+         }
+ 
+         @Override
+         public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
+             session.read(source, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream rawIn) throws IOException {
+                     try (final InputStream in = new BufferedInputStream(rawIn)) {
+                         while (unpackager.hasMoreData()) {
+                             final ObjectHolder<Map<String, String>> attributesRef = new ObjectHolder<>(null);
+                             FlowFile unpackedFile = session.create(source);
+                             try {
+                                 unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
+                                     @Override
+                                     public void process(final OutputStream rawOut) throws IOException {
+                                         try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+                                             final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out);
+                                             if (attributes == null) {
+                                                 throw new IOException("Failed to unpack " + source + ": stream had no Attributes");
+                                             }
+                                             attributesRef.set(attributes);
+                                         }
+                                     }
+                                 });
+ 
+                                 final Map<String, String> attributes = attributesRef.get();
+ 
+                                 // Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile.
+                                 // If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package
+                                 // and later unpack it -- in this case, we have two FlowFiles with the same UUID.
+                                 attributes.remove(CoreAttributes.UUID.key());
+ 
+                                 // maintain backward compatibility with legacy NiFi attribute names
+                                 mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key());
+                                 mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key());
+                                 mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key());
+                                 mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key());
+ 
+                                 if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
+                                     attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
+                                 }
+ 
+                                 unpackedFile = session.putAllAttributes(unpackedFile, attributes);
+                             } finally {
+                                 unpacked.add(unpackedFile);
+                             }
+                         }
+                     }
+                 }
+             });
+         }
+     }
+ 
+     /**
+      * Maps attributes from legacy nifi to the new naming scheme
+      *
+      * @param attributes
+      * @param oldKey
+      * @param newKey
+      */
+     private static void mapAttributes(final Map<String, String> attributes, final String oldKey, final String newKey) {
+         if (!attributes.containsKey(newKey) && attributes.containsKey(oldKey)) {
+             attributes.put(newKey, attributes.get(oldKey));
+         }
+     }
+ 
+     /**
+      * If the unpacked flowfiles contain fragment index attributes, then we need
+      * to apply fragment count and other attributes for completeness.
+      *
+      * @param session
+      * @param source
+      * @param unpacked
+      */
+     private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
+         // first pass verifies all FlowFiles have the FRAGMENT_INDEX attribute and gets the total number of fragments
+         int fragmentCount = 0;
+         for (FlowFile ff : unpacked) {
+             String fragmentIndex = ff.getAttribute(FRAGMENT_INDEX);
+             if (fragmentIndex != null) {
+                 fragmentCount++;
+             } else {
+                 return;
+             }
+         }
+ 
+         String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
+         if (originalFilename.endsWith(".tar") || originalFilename.endsWith(".zip") || originalFilename.endsWith(".pkg")) {
+             originalFilename = originalFilename.substring(0, originalFilename.length() - 4);
+         }
+ 
+         // second pass adds fragment attributes
+         ArrayList<FlowFile> newList = new ArrayList<>(unpacked);
+         unpacked.clear();
+         for (FlowFile ff : newList) {
+             final Map<String, String> attributes = new HashMap<>();
+             attributes.put(FRAGMENT_COUNT, String.valueOf(fragmentCount));
+             attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
+             FlowFile newFF = session.putAllAttributes(ff, attributes);
+             unpacked.add(newFF);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java
index 0000000,8f2001a..4808a59
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java
@@@ -1,0 -1,147 +1,147 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import javax.xml.transform.stream.StreamSource;
+ import javax.xml.validation.Schema;
+ import javax.xml.validation.SchemaFactory;
+ import javax.xml.validation.Validator;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.BooleanHolder;
+ 
+ import org.xml.sax.SAXException;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"xml", "schema", "validation", "xsd"})
+ @CapabilityDescription("Validates the contents of FlowFiles against a user-specified XML Schema file")
+ public class ValidateXml extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder()
+             .name("Schema File")
+             .description("The path to the Schema file that is to be used for validation")
+             .required(true)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+ 
+     public static final Relationship REL_VALID = new Relationship.Builder().name("valid").description("FlowFiles that are successfully validated against the schema are routed to this relationship").build();
+     public static final Relationship REL_INVALID = new Relationship.Builder().name("invalid").description("FlowFiles that are not valid according to the specified schema are routed to this relationship").build();
+ 
+     private static final String SCHEMA_LANGUAGE = "http://www.w3.org/2001/XMLSchema";
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+     private final AtomicReference<Schema> schemaRef = new AtomicReference<>();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(SCHEMA_FILE);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_VALID);
+         relationships.add(REL_INVALID);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @OnScheduled
+     public void parseSchema(final ProcessContext context) throws IOException, SAXException {
+         try {
+             final File file = new File(context.getProperty(SCHEMA_FILE).getValue());
+             final SchemaFactory schemaFactory = SchemaFactory.newInstance(SCHEMA_LANGUAGE);
+             final Schema schema = schemaFactory.newSchema(file);
+             this.schemaRef.set(schema);
+         } catch (final SAXException e) {
+             throw e;
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final List<FlowFile> flowFiles = session.get(50);
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final Schema schema = schemaRef.get();
+         final Validator validator = schema.newValidator();
+         final ProcessorLog logger = getLogger();
+ 
+         for (final FlowFile flowFile : flowFiles) {
+             final BooleanHolder valid = new BooleanHolder(true);
+             session.read(flowFile, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream in) throws IOException {
+                     try {
+                         validator.validate(new StreamSource(in));
+                     } catch (final IllegalArgumentException | SAXException e) {
+                         valid.set(false);
+                         logger.debug("Failed to validate {} against schema due to {}", new Object[]{flowFile, e});
+                     }
+                 }
+             });
+ 
+             if (valid.get()) {
+                 logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
+                 session.getProvenanceReporter().route(flowFile, REL_VALID);
+                 session.transfer(flowFile, REL_VALID);
+             } else {
+                 logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile});
+                 session.getProvenanceReporter().route(flowFile, REL_INVALID);
+                 session.transfer(flowFile, REL_INVALID);
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/RESTServiceContentModified.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/RESTServiceContentModified.java
index 0000000,8548c46..ec3211c
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/RESTServiceContentModified.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/RESTServiceContentModified.java
@@@ -1,0 -1,78 +1,78 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.text.SimpleDateFormat;
+ import java.util.Locale;
+ import java.util.TimeZone;
+ 
+ import javax.servlet.http.HttpServlet;
+ import javax.servlet.http.HttpServletRequest;
+ import javax.servlet.http.HttpServletResponse;
+ 
+ public class RESTServiceContentModified extends HttpServlet {
+ 
+     private static final long serialVersionUID = 1L;
+     static String result = "[\"sample1\",\"sample2\",\"sample3\",\"sample4\"]";
+     static long modificationDate = System.currentTimeMillis() / 1000 * 1000; // time resolution is to the second
+     static int ETAG;
+     public static boolean IGNORE_ETAG = false;
+     public static boolean IGNORE_LAST_MODIFIED = false;
+ 
+     public RESTServiceContentModified() {
 -        this.ETAG = this.hashCode();
++        ETAG = this.hashCode();
+     }
+ 
+     @Override
+     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+         String ifModifiedSince = request.getHeader("If-Modified-Since");
+         String ifNoneMatch = request.getHeader("If-None-Match");
+ 
+         final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US);
+         dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
+ 
+         response.setContentType("application/json");
+         if (ifNoneMatch != null && ifNoneMatch.length() > 0 && !IGNORE_ETAG && Integer.parseInt(ifNoneMatch) == ETAG) {
+             response.setStatus(304);
+             response.setHeader("Last-Modified", dateFormat.format(modificationDate));
+             response.setHeader("ETag", Integer.toString(ETAG));
+             return;
+         }
+ 
+         long date = -1;
+         if (ifModifiedSince != null && ifModifiedSince.length() > 0 && !IGNORE_LAST_MODIFIED) {
+             try {
+                 date = dateFormat.parse(ifModifiedSince).getTime();
+             } catch (Exception e) {
+ 
+             }
+         }
+         if (date >= modificationDate) {
+             response.setStatus(304);
+             response.setHeader("Last-Modified", dateFormat.format(modificationDate));
+             response.setHeader("ETag", Integer.toString(ETAG));
+             return;
+         }
+ 
+         response.setStatus(200);
+         response.setHeader("Last-Modified", dateFormat.format(modificationDate));
+         response.setHeader("ETag", Integer.toString(ETAG));
+         response.getOutputStream().println(result);
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 0000000,1af48d6..71c8583
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@@ -1,0 -1,111 +1,112 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
++import static org.junit.Assert.assertTrue;
++
+ import java.io.IOException;
 -import java.nio.file.Files;
+ import java.nio.file.Paths;
++
+ import org.apache.nifi.util.MockFlowFile;
+ import org.apache.nifi.util.TestRunner;
+ import org.apache.nifi.util.TestRunners;
 -import static org.junit.Assert.assertTrue;
+ import org.junit.Test;
+ 
+ public class TestCompressContent {
+ 
+     @Test
+     public void testBzip2DecompressConcatenated() throws Exception {
+         final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+         runner.setProperty(CompressContent.MODE, "decompress");
+         runner.setProperty(CompressContent.COMPRESSION_FORMAT, "bzip2");
+         runner.setProperty(CompressContent.UPDATE_FILENAME, "false");
+ 
+         runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFileConcat.txt.bz2"));
+         runner.run();
+ 
+         runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+         MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+         flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFileConcat.txt"));
+         flowFile.assertAttributeEquals("filename", "SampleFileConcat.txt.bz2"); // not updating filename
+     }
+ 
+     @Test
+     public void testBzip2Decompress() throws Exception {
+         final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+         runner.setProperty(CompressContent.MODE, "decompress");
+         runner.setProperty(CompressContent.COMPRESSION_FORMAT, "bzip2");
+         runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+ 
+         runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.bz2"));
+         runner.run();
+ 
+         runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+         MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+         flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+         flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+ 
+         runner.clearTransferState();
+         runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile1.txt.bz2"));
+         runner.run();
+ 
+         runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+         flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+         flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+         flowFile.assertAttributeEquals("filename", "SampleFile1.txt");
+     }
+ 
+     @Test
+     public void testGzipDecompress() throws Exception {
+         final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+         runner.setProperty(CompressContent.MODE, "decompress");
+         runner.setProperty(CompressContent.COMPRESSION_FORMAT, "gzip");
+         assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
+ 
+         runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.gz"));
+         runner.run();
+ 
+         runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+         MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+         flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+         flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+ 
+         runner.clearTransferState();
+         runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile1.txt.gz"));
+         runner.run();
+ 
+         runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+         flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+         flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+         flowFile.assertAttributeEquals("filename", "SampleFile1.txt");
+     }
+ 
+     @Test
+     public void testFilenameUpdatedOnCompress() throws IOException {
+         final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+         runner.setProperty(CompressContent.MODE, "compress");
+         runner.setProperty(CompressContent.COMPRESSION_FORMAT, "gzip");
+         assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
+ 
+         runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+         runner.run();
+ 
+         runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+         MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+         flowFile.assertAttributeEquals("filename", "SampleFile.txt.gz");
+ 
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java
index 0000000,6092761..1b057d9
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertCharacterSet.java
@@@ -1,0 -1,47 +1,44 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
 -import org.apache.nifi.processors.standard.ConvertCharacterSet;
+ import java.io.File;
+ import java.io.IOException;
 -import java.nio.file.Files;
+ import java.nio.file.Paths;
+ 
+ import org.apache.nifi.util.MockFlowFile;
+ import org.apache.nifi.util.TestRunner;
+ import org.apache.nifi.util.TestRunners;
 -
+ import org.junit.Test;
+ 
+ public class TestConvertCharacterSet {
+ 
+     @Test
+     public void test() throws IOException {
+         final TestRunner runner = TestRunners.newTestRunner(new ConvertCharacterSet());
+         runner.setProperty(ConvertCharacterSet.INPUT_CHARSET, "ASCII");
+         runner.setProperty(ConvertCharacterSet.OUTPUT_CHARSET, "UTF-32");
+ 
+         runner.enqueue(Paths.get("src/test/resources/CharacterSetConversionSamples/Original.txt"));
+         runner.run();
+ 
+         runner.assertAllFlowFilesTransferred(ConvertCharacterSet.REL_SUCCESS, 1);
+         final MockFlowFile output = runner.getFlowFilesForRelationship(ConvertCharacterSet.REL_SUCCESS).get(0);
+         output.assertContentEquals(new File("src/test/resources/CharacterSetConversionSamples/Converted2.txt"));
+     }
+ 
+ }