You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/16 17:57:47 UTC

[2/8] incubator-nifi git commit: Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 0000000,6e5f65d..15591d7
mode 000000,100644..100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@@ -1,0 -1,261 +1,284 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.util;
+ 
+ import static java.util.Objects.requireNonNull;
+ 
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
++import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
++import java.util.Set;
+ 
+ import org.apache.nifi.components.ConfigurableComponent;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ControllerServiceLookup;
++import org.apache.nifi.processor.Processor;
++import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.SchedulingContext;
+ import org.junit.Assert;
+ 
+ public class MockProcessContext extends MockControllerServiceLookup implements SchedulingContext, ControllerServiceLookup {
+ 
+     private final ConfigurableComponent component;
+     private final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ 
+     private String annotationData = null;
+     private boolean yieldCalled = false;
+     private boolean enableExpressionValidation = false;
+     private boolean allowExpressionValidation = true;
+ 
++    private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
++
+     /**
+      * Creates a new MockProcessContext for the given Processor
+      *
+      * @param component
+      */
+     public MockProcessContext(final ConfigurableComponent component) {
+         this.component = Objects.requireNonNull(component);
+     }
+ 
+     public MockProcessContext(final ControllerService component, final MockProcessContext context) {
+         this(component);
+ 
+         try {
+             annotationData = context.getControllerServiceAnnotationData(component);
+             final Map<PropertyDescriptor, String> props = context.getControllerServiceProperties(component);
+             properties.putAll(props);
+         } catch (IllegalArgumentException e) {
+             // do nothing...the service is being loaded
+         }
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+         return getProperty(descriptor.getName());
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final String propertyName) {
+         final PropertyDescriptor descriptor = component.getPropertyDescriptor(propertyName);
+         if (descriptor == null) {
+             return null;
+         }
+ 
+         final String setPropertyValue = properties.get(descriptor);
+         final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
+         return new MockPropertyValue(propValue, this, (enableExpressionValidation && allowExpressionValidation) ? descriptor : null);
+     }
+ 
+     @Override
+     public PropertyValue newPropertyValue(final String rawValue) {
+         return new MockPropertyValue(rawValue, this);
+     }
+ 
+     public ValidationResult setProperty(final String propertyName, final String propertyValue) {
+         return setProperty(new PropertyDescriptor.Builder().name(propertyName).build(), propertyValue);
+     }
+ 
+     /**
+      * Updates the value of the property with the given PropertyDescriptor to
+      * the specified value IF and ONLY IF the value is valid according to the
+      * descriptor's validator. Otherwise, the property value is not updated. In
+      * either case, the ValidationResult is returned, indicating whether or not
+      * the property is valid
+      *
+      * @param descriptor
+      * @param value
+      * @return
+      */
+     public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
+         requireNonNull(descriptor);
+         requireNonNull(value, "Cannot set property to null value; if the intent is to remove the property, call removeProperty instead");
+         final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName());
+ 
+         final ValidationResult result = fullyPopulatedDescriptor.validate(value, new MockValidationContext(this));
+         String oldValue = properties.put(fullyPopulatedDescriptor, value);
+         if (oldValue == null) {
+             oldValue = fullyPopulatedDescriptor.getDefaultValue();
+         }
+         if ((value == null && oldValue != null) || (value != null && !value.equals(oldValue))) {
+             component.onPropertyModified(fullyPopulatedDescriptor, oldValue, value);
+         }
+ 
+         return result;
+     }
+ 
+     public boolean removeProperty(final PropertyDescriptor descriptor) {
+         Objects.requireNonNull(descriptor);
+         final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName());
+         String value = null;
+         if (!fullyPopulatedDescriptor.isRequired() && (value = properties.remove(fullyPopulatedDescriptor)) != null) {
+             component.onPropertyModified(fullyPopulatedDescriptor, value, null);
+             return true;
+         }
+         return false;
+     }
+ 
+     @Override
+     public void yield() {
+         yieldCalled = true;
+     }
+ 
+     public boolean isYieldCalled() {
+         return yieldCalled;
+     }
+ 
+     public void addControllerService(final String serviceIdentifier, final ControllerService controllerService, final Map<PropertyDescriptor, String> properties, final String annotationData) {
+         requireNonNull(controllerService);
+         final ControllerServiceConfiguration config = addControllerService(controllerService);
+         config.setProperties(properties);
+         config.setAnnotationData(annotationData);
+     }
+ 
+     @Override
+     public int getMaxConcurrentTasks() {
+         return 1;
+     }
+ 
+     public void setAnnotationData(final String annotationData) {
+         this.annotationData = annotationData;
+     }
+ 
+     @Override
+     public String getAnnotationData() {
+         return annotationData;
+     }
+ 
+     @Override
+     public Map<PropertyDescriptor, String> getProperties() {
+         final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
+         if (supported == null || supported.isEmpty()) {
+             return Collections.unmodifiableMap(properties);
+         } else {
+             final Map<PropertyDescriptor, String> props = new LinkedHashMap<>();
+             for (final PropertyDescriptor descriptor : supported) {
+                 props.put(descriptor, null);
+             }
+             props.putAll(properties);
+             return props;
+         }
+     }
+ 
+     /**
+      * Validates the current properties, returning ValidationResults for any
+      * invalid properties. All processor defined properties will be validated.
+      * If they are not included in the in the purposed configuration, the
+      * default value will be used.
+      *
+      * @return Collection of validation result objects for any invalid findings
+      * only. If the collection is empty then the processor is valid. Guaranteed
+      * non-null
+      */
+     public Collection<ValidationResult> validate() {
+         return component.validate(new MockValidationContext(this));
+     }
+ 
+     public boolean isValid() {
+         for (final ValidationResult result : validate()) {
+             if (!result.isValid()) {
+                 return false;
+             }
+         }
+ 
+         return true;
+     }
+ 
+     public void assertValid() {
+         final StringBuilder sb = new StringBuilder();
+         int failureCount = 0;
+ 
+         for (final ValidationResult result : validate()) {
+             if (!result.isValid()) {
+                 sb.append(result.toString()).append("\n");
+                 failureCount++;
+             }
+         }
+ 
+         if (failureCount > 0) {
+             Assert.fail("Processor has " + failureCount + " validation failures:\n" + sb.toString());
+         }
+     }
+ 
+     @Override
+     public String encrypt(final String unencrypted) {
+         return "enc{" + unencrypted + "}";
+     }
+ 
+     @Override
+     public String decrypt(final String encrypted) {
+         if (encrypted.startsWith("enc{") && encrypted.endsWith("}")) {
+             return encrypted.substring(4, encrypted.length() - 2);
+         }
+         return encrypted;
+     }
+ 
+     public void setValidateExpressionUsage(final boolean validate) {
+         allowExpressionValidation = validate;
+     }
+ 
+     public void enableExpressionValidation() {
+         enableExpressionValidation = true;
+     }
+ 
+     public void disableExpressionValidation() {
+         enableExpressionValidation = false;
+     }
+ 
+     Map<PropertyDescriptor, String> getControllerServiceProperties(final ControllerService controllerService) {
+         return super.getConfiguration(controllerService.getIdentifier()).getProperties();
+     }
+ 
+     String getControllerServiceAnnotationData(final ControllerService controllerService) {
+         return super.getConfiguration(controllerService.getIdentifier()).getAnnotationData();
+     }
+ 
+     @Override
+     public ControllerServiceLookup getControllerServiceLookup() {
+         return this;
+     }
+ 
+     @Override
+     public void leaseControllerService(final String identifier) {
+     }
+ 
++    public Set<Relationship> getAvailableRelationships() {
++        if ( !(component instanceof Processor) ) {
++            return Collections.emptySet();
++        }
++        
++        final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships());
++        relationships.removeAll(unavailableRelationships);
++        return relationships;
++    }
++
++    public void setUnavailableRelationships(final Set<Relationship> relationships) {
++        this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
++    }
++
++    public Set<Relationship> getUnavailableRelationships() {
++        return unavailableRelationships;
++    }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 0000000,552780c..ea55b34
mode 000000,100644..100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@@ -1,0 -1,1010 +1,1006 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.util;
+ 
+ import java.io.ByteArrayInputStream;
+ import java.io.ByteArrayOutputStream;
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Files;
+ import java.nio.file.OpenOption;
+ import java.nio.file.Path;
+ import java.nio.file.StandardOpenOption;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.regex.Pattern;
+ 
+ import org.junit.Assert;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.FlowFileFilter;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.QueueSize;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processor.exception.FlowFileHandlingException;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+ 
+ public class MockProcessSession implements ProcessSession {
+ 
+     private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
+     private final MockFlowFileQueue processorQueue;
+     private final Set<Long> beingProcessed = new HashSet<>();
+ 
+     private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
+     private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
+     private final SharedSessionState sharedState;
+     private final Map<String, Long> counterMap = new HashMap<>();
+ 
+     private boolean committed = false;
+     private boolean rolledback = false;
+     private int removedCount = 0;
+ 
+     public MockProcessSession(final SharedSessionState sharedState) {
+         this.sharedState = sharedState;
+         this.processorQueue = sharedState.getFlowFileQueue();
+     }
+ 
+     @Override
+     public void adjustCounter(final String name, final long delta, final boolean immediate) {
+         if (immediate) {
+             sharedState.adjustCounter(name, delta);
+             return;
+         }
+ 
+         Long counter = counterMap.get(name);
+         if (counter == null) {
+             counter = delta;
+             counterMap.put(name, counter);
+             return;
+         }
+ 
+         counter = counter + delta;
+         counterMap.put(name, counter);
+     }
+ 
+     @Override
+     public MockFlowFile clone(final FlowFile flowFile) {
+         validateState(flowFile);
+         final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+         beingProcessed.add(newFlowFile.getId());
+         return newFlowFile;
+     }
+ 
+     @Override
+     public MockFlowFile clone(final FlowFile flowFile, final long offset, final long size) {
+         validateState(flowFile);
+         if (offset + size > flowFile.getSize()) {
+             throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString());
+         }
+ 
+         final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
+         final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size));
+         newFlowFile.setData(newContent);
+ 
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+         beingProcessed.add(newFlowFile.getId());
+         return newFlowFile;
+     }
+ 
+     @Override
+     public void commit() {
+         if (!beingProcessed.isEmpty()) {
+             throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed);
+         }
+         committed = true;
+         beingProcessed.clear();
+         currentVersions.clear();
+         originalVersions.clear();
+ 
+         for (final Map.Entry<String, Long> entry : counterMap.entrySet()) {
+             sharedState.adjustCounter(entry.getKey(), entry.getValue());
+         }
+ 
+         counterMap.clear();
+     }
+ 
+     /**
+      * Clear the 'committed' flag so that we can test that the next iteration of
+      * {@link nifi.processor.Processor#onTrigger} commits or rolls back the
+      * session
+      */
+     public void clearCommited() {
+         committed = false;
+     }
+ 
+     /**
+      * Clear the 'rolledBack' flag so that we can test that the next iteration
+      * of {@link nifi.processor.Processor#onTrigger} commits or rolls back the
+      * session
+      */
+     public void clearRollback() {
+         rolledback = false;
+     }
+ 
+     @Override
+     public MockFlowFile create() {
+         final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId());
+         currentVersions.put(flowFile.getId(), flowFile);
+         beingProcessed.add(flowFile.getId());
+         return flowFile;
+     }
+ 
+     @Override
+     public MockFlowFile create(final FlowFile flowFile) {
+         MockFlowFile newFlowFile = create();
+         newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+         beingProcessed.add(newFlowFile.getId());
+         return newFlowFile;
+     }
+ 
+     @Override
+     public MockFlowFile create(final Collection<FlowFile> flowFiles) {
+         MockFlowFile newFlowFile = create();
+         newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+         beingProcessed.add(newFlowFile.getId());
+         return newFlowFile;
+     }
+ 
+     @Override
+     public void exportTo(final FlowFile flowFile, final OutputStream out) {
+         validateState(flowFile);
+         if (flowFile == null || out == null) {
+             throw new IllegalArgumentException("arguments cannot be null");
+         }
+ 
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+ 
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+ 
+         try {
+             out.write(mock.getData());
+         } catch (IOException e) {
+             throw new FlowFileAccessException(e.toString(), e);
+         }
+     }
+ 
+     @Override
+     public void exportTo(final FlowFile flowFile, final Path path, final boolean append) {
+         validateState(flowFile);
+         if (flowFile == null || path == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+ 
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+ 
+         final OpenOption mode = append ? StandardOpenOption.APPEND : StandardOpenOption.CREATE;
+ 
+         try (final OutputStream out = Files.newOutputStream(path, mode)) {
+             out.write(mock.getData());
+         } catch (final IOException e) {
+             throw new FlowFileAccessException(e.toString(), e);
+         }
+     }
+ 
+     @Override
+     public MockFlowFile get() {
+         final MockFlowFile flowFile = processorQueue.poll();
+         if (flowFile != null) {
+             beingProcessed.add(flowFile.getId());
+             currentVersions.put(flowFile.getId(), flowFile);
+             originalVersions.put(flowFile.getId(), flowFile);
+         }
+         return flowFile;
+     }
+ 
+     @Override
+     public List<FlowFile> get(final int maxResults) {
+         final List<FlowFile> flowFiles = new ArrayList<>(Math.min(500, maxResults));
+         for (int i = 0; i < maxResults; i++) {
+             final MockFlowFile nextFlowFile = get();
+             if (nextFlowFile == null) {
+                 return flowFiles;
+             }
+ 
+             flowFiles.add(nextFlowFile);
+         }
+ 
+         return flowFiles;
+     }
+ 
+     @Override
+     public List<FlowFile> get(final FlowFileFilter filter) {
+         final List<FlowFile> flowFiles = new ArrayList<>();
+         final List<MockFlowFile> unselected = new ArrayList<>();
+ 
+         while (true) {
+             final MockFlowFile flowFile = processorQueue.poll();
+             if (flowFile == null) {
+                 break;
+             }
+ 
+             final FlowFileFilter.FlowFileFilterResult filterResult = filter.filter(flowFile);
+             if (filterResult.isAccept()) {
+                 flowFiles.add(flowFile);
+ 
+                 beingProcessed.add(flowFile.getId());
+                 currentVersions.put(flowFile.getId(), flowFile);
+                 originalVersions.put(flowFile.getId(), flowFile);
+             } else {
+                 unselected.add(flowFile);
+             }
+ 
+             if (!filterResult.isContinue()) {
+                 break;
+             }
+         }
+ 
+         processorQueue.addAll(unselected);
+         return flowFiles;
+     }
+ 
+     @Override
+     public QueueSize getQueueSize() {
+         return processorQueue.size();
+     }
+ 
+     @Override
+     public MockFlowFile importFrom(final InputStream in, final FlowFile flowFile) {
+         validateState(flowFile);
+         if (in == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+ 
+         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+         try {
+             final byte[] data = readFully(in);
+             newFlowFile.setData(data);
+             return newFlowFile;
+         } catch (final IOException e) {
+             throw new FlowFileAccessException(e.toString(), e);
+         }
+     }
+ 
+     @Override
+     public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, final FlowFile flowFile) {
+         validateState(flowFile);
+         if (path == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+         MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         try {
+             Files.copy(path, baos);
+         } catch (final IOException e) {
+             throw new FlowFileAccessException(e.toString(), e);
+         }
+ 
+         newFlowFile.setData(baos.toByteArray());
+         newFlowFile = putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), path.getFileName().toString());
+         return newFlowFile;
+     }
+ 
 -    @Override
 -    public Set<Relationship> getAvailableRelationships() {
 -        return sharedState.getAvailableRelationships();
 -    }
+ 
+     @Override
+     public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) {
+         for (final FlowFile source : sources) {
+             validateState(source);
+         }
+         validateState(destination);
+         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         for (final FlowFile flowFile : sources) {
+             final MockFlowFile mock = (MockFlowFile) flowFile;
+             final byte[] data = mock.getData();
+             try {
+                 baos.write(data);
+             } catch (final IOException e) {
+                 throw new AssertionError("Failed to write to BAOS");
+             }
+         }
+ 
+         final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination);
+         newFlowFile.setData(baos.toByteArray());
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         return newFlowFile;
+     }
+ 
+     @Override
+     public MockFlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attrs) {
+         validateState(flowFile);
+         if (attrs == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         newFlowFile.putAttributes(attrs);
+         return newFlowFile;
+     }
+ 
+     @Override
+     public MockFlowFile putAttribute(final FlowFile flowFile, final String attrName, final String attrValue) {
+         validateState(flowFile);
+         if (attrName == null || attrValue == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create");
+         }
+ 
+         if ("uuid".equals(attrName)) {
+             Assert.fail("Should not be attempting to set FlowFile UUID via putAttribute. This will be ignored in production");
+         }
+ 
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         final Map<String, String> attrs = new HashMap<>();
+         attrs.put(attrName, attrValue);
+         newFlowFile.putAttributes(attrs);
+         return newFlowFile;
+     }
+ 
+     @Override
+     public void read(final FlowFile flowFile, final InputStreamCallback callback) {
+         if (callback == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+ 
+         validateState(flowFile);
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+ 
+         final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
+         try {
+             callback.process(bais);
+         } catch (IOException e) {
+             throw new ProcessException(e.toString(), e);
+         }
+     }
+ 
+     @Override
+     public void remove(final FlowFile flowFile) {
+         validateState(flowFile);
+         final Iterator<Long> itr = beingProcessed.iterator();
+         while (itr.hasNext()) {
+             final Long ffId = itr.next();
+             if (ffId != null && ffId.equals(flowFile.getId())) {
+                 itr.remove();
+                 beingProcessed.remove(ffId);
+                 removedCount++;
+                 currentVersions.remove(ffId);
+                 return;
+             }
+         }
+ 
+         throw new ProcessException(flowFile + " not found in queue");
+     }
+ 
+     @Override
+     public void remove(final Collection<FlowFile> flowFiles) {
+         for (final FlowFile flowFile : flowFiles) {
+             validateState(flowFile);
+         }
+ 
+         for (final FlowFile flowFile : flowFiles) {
+             remove(flowFile);
+         }
+     }
+ 
+     @Override
+     public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> attrNames) {
+         validateState(flowFile);
+         if (attrNames == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+ 
+         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         newFlowFile.removeAttributes(attrNames);
+         return newFlowFile;
+     }
+ 
+     @Override
+     public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) {
+         validateState(flowFile);
+         if (flowFile == null) {
+             throw new IllegalArgumentException("flowFile cannot be null");
+         }
+         if (keyPattern == null) {
+             return (MockFlowFile) flowFile;
+         }
+ 
+         final Set<String> attrsToRemove = new HashSet<>();
+         for (final String key : flowFile.getAttributes().keySet()) {
+             if (keyPattern.matcher(key).matches()) {
+                 attrsToRemove.add(key);
+             }
+         }
+ 
+         return removeAllAttributes(flowFile, attrsToRemove);
+     }
+ 
+     @Override
+     public MockFlowFile removeAttribute(final FlowFile flowFile, final String attrName) {
+         validateState(flowFile);
+         if (attrName == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         final Set<String> attrNames = new HashSet<>();
+         attrNames.add(attrName);
+         newFlowFile.removeAttributes(attrNames);
+         return newFlowFile;
+     }
+ 
+     @Override
+     public void rollback() {
+         rollback(false);
+     }
+ 
+     @Override
+     public void rollback(final boolean penalize) {
+         for (final List<MockFlowFile> list : transferMap.values()) {
+             for (final MockFlowFile flowFile : list) {
+                 processorQueue.offer(flowFile);
+             }
+         }
+ 
+         for (final Long flowFileId : beingProcessed) {
+             final MockFlowFile flowFile = originalVersions.get(flowFileId);
+             if (flowFile != null) {
+                 processorQueue.offer(flowFile);
+             }
+         }
+ 
+         rolledback = true;
+         beingProcessed.clear();
+         currentVersions.clear();
+         originalVersions.clear();
+         transferMap.clear();
+         clearTransferState();
+     }
+ 
+     @Override
+     public void transfer(final FlowFile flowFile) {
+         validateState(flowFile);
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("I only accept MockFlowFile");
+         }
+ 
+         beingProcessed.remove(flowFile.getId());
+         processorQueue.offer((MockFlowFile) flowFile);
+     }
+ 
+     @Override
+     public void transfer(final Collection<FlowFile> flowFiles) {
+         for (final FlowFile flowFile : flowFiles) {
+             transfer(flowFile);
+         }
+     }
+ 
+     @Override
+     public void transfer(final FlowFile flowFile, final Relationship relationship) {
+         validateState(flowFile);
+         List<MockFlowFile> list = transferMap.get(relationship);
+         if (list == null) {
+             list = new ArrayList<>();
+             transferMap.put(relationship, list);
+         }
+ 
+         beingProcessed.remove(flowFile.getId());
+         list.add((MockFlowFile) flowFile);
+     }
+ 
+     @Override
+     public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) {
+         for (final FlowFile flowFile : flowFiles) {
+             validateState(flowFile);
+         }
+ 
+         List<MockFlowFile> list = transferMap.get(relationship);
+         if (list == null) {
+             list = new ArrayList<>();
+             transferMap.put(relationship, list);
+         }
+ 
+         for (final FlowFile flowFile : flowFiles) {
+             beingProcessed.remove(flowFile.getId());
+             list.add((MockFlowFile) flowFile);
+         }
+     }
+ 
+     @Override
+     public MockFlowFile write(final FlowFile flowFile, final OutputStreamCallback callback) {
+         validateState(flowFile);
+         if (callback == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+ 
+         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         try {
+             callback.process(baos);
+         } catch (final IOException e) {
+             throw new ProcessException(e.toString(), e);
+         }
+ 
+         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         newFlowFile.setData(baos.toByteArray());
+         return newFlowFile;
+     }
+ 
+     @Override
+     public FlowFile append(final FlowFile flowFile, final OutputStreamCallback callback) {
+         validateState(flowFile);
+         if (callback == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+ 
+         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         try {
+             baos.write(mock.getData());
+             callback.process(baos);
+         } catch (final IOException e) {
+             throw new ProcessException(e.toString(), e);
+         }
+ 
+         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         newFlowFile.setData(baos.toByteArray());
+         return newFlowFile;
+     }
+ 
+     @Override
+     public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) {
+         validateState(flowFile);
+         if (callback == null || flowFile == null) {
+             throw new IllegalArgumentException("argument cannot be null");
+         }
+         if (!(flowFile instanceof MockFlowFile)) {
+             throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+         }
+         final MockFlowFile mock = (MockFlowFile) flowFile;
+ 
+         final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData());
+         final ByteArrayOutputStream out = new ByteArrayOutputStream();
+         try {
+             callback.process(in, out);
+         } catch (final IOException e) {
+             throw new ProcessException(e.toString(), e);
+         }
+ 
+         final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+         newFlowFile.setData(out.toByteArray());
+ 
+         return newFlowFile;
+     }
+ 
+     private byte[] readFully(final InputStream in) throws IOException {
+         final byte[] buffer = new byte[4096];
+         int len;
+         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         while ((len = in.read(buffer)) >= 0) {
+             baos.write(buffer, 0, len);
+         }
+ 
+         return baos.toByteArray();
+     }
+ 
+     public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) {
+         List<MockFlowFile> list = this.transferMap.get(relationship);
+         if (list == null) {
+             list = new ArrayList<>();
+         }
+ 
+         return list;
+     }
+ 
+     /**
+      * Returns a List of FlowFiles in the order in which they were transferred
+      * to the given relationship
+      *
+      * @param relationship
+      * @return
+      */
+     public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) {
+         final Relationship procRel = new Relationship.Builder().name(relationship).build();
+         return getFlowFilesForRelationship(procRel);
+     }
+ 
+     public MockFlowFile createFlowFile(final File file) throws IOException {
+         return createFlowFile(Files.readAllBytes(file.toPath()));
+     }
+ 
+     public MockFlowFile createFlowFile(final byte[] data) {
+         final MockFlowFile flowFile = create();
+         flowFile.setData(data);
+         return flowFile;
+     }
+ 
+     public MockFlowFile createFlowFile(final byte[] data, final Map<String, String> attrs) {
+         final MockFlowFile ff = createFlowFile(data);
+         ff.putAttributes(attrs);
+         return ff;
+     }
+ 
+     @Override
+     public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) {
+         for (final FlowFile flowFile : sources) {
+             validateState(flowFile);
+         }
+         validateState(destination);
+ 
+         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         try {
+             if (header != null) {
+                 baos.write(header);
+             }
+ 
+             int count = 0;
+             for (final FlowFile flowFile : sources) {
+                 baos.write(((MockFlowFile) flowFile).getData());
+                 if (demarcator != null && ++count != sources.size()) {
+                     baos.write(demarcator);
+                 }
+             }
+ 
+             if (footer != null) {
+                 baos.write(footer);
+             }
+         } catch (final IOException e) {
+             throw new AssertionError("failed to write data to BAOS");
+         }
+ 
+         final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination);
+         newFlowFile.setData(baos.toByteArray());
+         currentVersions.put(newFlowFile.getId(), newFlowFile);
+ 
+         return newFlowFile;
+     }
+ 
+     private void validateState(final FlowFile flowFile) {
+         Objects.requireNonNull(flowFile);
+         final FlowFile currentVersion = currentVersions.get(flowFile.getId());
+         if (currentVersion == null) {
+             throw new FlowFileHandlingException(flowFile + " is not known in this session");
+         }
+ 
+         if (currentVersion != flowFile) {
+             throw new FlowFileHandlingException(flowFile + " is not the most recent version of this flow file within this session");
+         }
+ 
+         for (final List<MockFlowFile> flowFiles : transferMap.values()) {
+             if (flowFiles.contains(flowFile)) {
+                 throw new IllegalStateException(flowFile + " has already been transferred");
+             }
+         }
+     }
+ 
+     /**
+      * Inherits the attributes from the given source flow file into another flow
+      * file. The UUID of the source becomes the parent UUID of this flow file.
+      * If a parent uuid had previously been established it will be replaced by
+      * the uuid of the given source
+      *
+      * @param source the FlowFile from which to copy attributes
+      * @param destination the FlowFile to which to copy attributes
+      */
+     private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) {
+         if (source == null || destination == null || source == destination) {
+             return destination; //don't need to inherit from ourselves
+         }
+         FlowFile updated = putAllAttributes(destination, source.getAttributes());
+         getProvenanceReporter().fork(source, Collections.singletonList(updated));
+         return updated;
+     }
+ 
+     /**
+      * Inherits the attributes from the given source flow files into the
+      * destination flow file. The UUIDs of the sources becomes the parent UUIDs
+      * of the destination flow file. Only attributes which is common to all
+      * source items is copied into this flow files attributes. Any previously
+      * established parent UUIDs will be replaced by the UUIDs of the sources. It
+      * will capture the uuid of a certain number of source objects and may not
+      * capture all of them. How many it will capture is unspecified.
+      *
+      * @param sources
+      */
+     private FlowFile inheritAttributes(final Collection<FlowFile> sources, final FlowFile destination) {
+         final String uuid = destination.getAttribute(CoreAttributes.UUID.key());
+         final StringBuilder parentUuidBuilder = new StringBuilder();
+         int uuidsCaptured = 0;
+         for (final FlowFile source : sources) {
+             if (source == destination) {
+                 continue; //don't want to capture parent uuid of this.  Something can't be a child of itself
+             }
+             final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key());
+             if (sourceUuid != null && !sourceUuid.trim().isEmpty()) {
+                 uuidsCaptured++;
+                 if (parentUuidBuilder.length() > 0) {
+                     parentUuidBuilder.append(",");
+                 }
+                 parentUuidBuilder.append(sourceUuid);
+             }
+ 
+             if (uuidsCaptured > 100) {
+                 break;
+             }
+         }
+ 
+         FlowFile updated = putAllAttributes(destination, intersectAttributes(sources));
+         getProvenanceReporter().join(sources, updated);
+         return updated;
+     }
+ 
+     /**
+      * Returns the attributes that are common to every flow file given. The key
+      * and value must match exactly.
+      *
+      * @param flowFileList a list of flow files
+      *
+      * @return the common attributes
+      */
+     private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
+         final Map<String, String> result = new HashMap<>();
+         //trivial cases
+         if (flowFileList == null || flowFileList.isEmpty()) {
+             return result;
+         } else if (flowFileList.size() == 1) {
+             result.putAll(flowFileList.iterator().next().getAttributes());
+         }
+ 
+         /*
+          * Start with the first attribute map and only put an entry to the
+          * resultant map if it is common to every map.
+          */
+         final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
+ 
+         outer:
+         for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+             final String key = mapEntry.getKey();
+             final String value = mapEntry.getValue();
+             for (final FlowFile flowFile : flowFileList) {
+                 final Map<String, String> currMap = flowFile.getAttributes();
+                 final String curVal = currMap.get(key);
+                 if (curVal == null || !curVal.equals(value)) {
+                     continue outer;
+                 }
+             }
+             result.put(key, value);
+         }
+ 
+         return result;
+     }
+ 
+     /**
+      * Assert that {@link #commit()} has been called
+      */
+     public void assertCommitted() {
+         Assert.assertTrue("Session was not committed", committed);
+     }
+ 
+     /**
+      * Assert that {@link #commit()} has not been called
+      */
+     public void assertNotCommitted() {
+         Assert.assertFalse("Session was committed", committed);
+     }
+ 
+     /**
+      * Assert that {@link #rollback()} has been called
+      */
+     public void assertRolledBack() {
+         Assert.assertTrue("Session was not rolled back", rolledback);
+     }
+ 
+     /**
+      * Assert that {@link #rollback()} has not been called
+      */
+     public void assertNotRolledBack() {
+         Assert.assertFalse("Session was rolled back", rolledback);
+     }
+ 
+     /**
+      * Assert that the number of FlowFiles transferred to the given relationship
+      * is equal to the given count
+      *
+      * @param relationship
+      * @param count
+      */
+     public void assertTransferCount(final Relationship relationship, final int count) {
+         final int transferCount = getFlowFilesForRelationship(relationship).size();
+         Assert.assertEquals("Expected " + count + " FlowFiles to be transferred to "
+                 + relationship + " but actual transfer count was " + transferCount, count, transferCount);
+     }
+ 
+     /**
+      * Assert that the number of FlowFiles transferred to the given relationship
+      * is equal to the given count
+      *
+      * @param relationship
+      * @param count
+      */
+     public void assertTransferCount(final String relationship, final int count) {
+         assertTransferCount(new Relationship.Builder().name(relationship).build(), count);
+     }
+ 
+     /**
+      * Assert that there are no FlowFiles left on the input queue.
+      */
+     public void assertQueueEmpty() {
+         Assert.assertTrue("FlowFile Queue has " + this.processorQueue.size() + " items", this.processorQueue.isEmpty());
+     }
+ 
+     /**
+      * Assert that at least one FlowFile is on the input queue
+      */
+     public void assertQueueNotEmpty() {
+         Assert.assertFalse("FlowFile Queue is empty", this.processorQueue.isEmpty());
+     }
+ 
+     /**
+      * Asserts that all FlowFiles that were transferred were transferred to the
+      * given relationship
+      *
+      * @param relationship
+      */
+     public void assertAllFlowFilesTransferred(final String relationship) {
+         assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build());
+     }
+ 
+     /**
+      * Asserts that all FlowFiles that were transferred were transferred to the
+      * given relationship
+      *
+      * @param relationship
+      */
+     public void assertAllFlowFilesTransferred(final Relationship relationship) {
+         for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
+             final Relationship rel = entry.getKey();
+             final List<MockFlowFile> flowFiles = entry.getValue();
+ 
+             if (!rel.equals(relationship) && flowFiles != null && !flowFiles.isEmpty()) {
+                 Assert.fail("Expected all Transferred FlowFiles to go to " + relationship + " but " + flowFiles.size() + " were routed to " + rel);
+             }
+         }
+     }
+ 
+     /**
+      * Removes all state information about FlowFiles that have been transferred
+      */
+     public void clearTransferState() {
+         this.transferMap.clear();
+     }
+ 
+     /**
+      * Asserts that all FlowFiles that were transferred were transferred to the
+      * given relationship and that the number of FlowFiles transferred is equal
+      * to <code>count</code>
+      *
+      * @param relationship
+      * @param count
+      */
+     public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) {
+         assertAllFlowFilesTransferred(relationship);
+         assertTransferCount(relationship, count);
+     }
+ 
+     /**
+      * Asserts that all FlowFiles that were transferred were transferred to the
+      * given relationship and that the number of FlowFiles transferred is equal
+      * to <code>count</code>
+      *
+      * @param relationship
+      * @param count
+      */
+     public void assertAllFlowFilesTransferred(final String relationship, final int count) {
+         assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build(), count);
+     }
+ 
+     /**
+      * Returns the number of FlowFiles that were removed
+      *
+      * @return
+      */
+     public int getRemovedCount() {
+         return removedCount;
+     }
+ 
+     @Override
+     public ProvenanceReporter getProvenanceReporter() {
+         return sharedState.getProvenanceReporter();
+     }
+ 
+     @Override
+     public MockFlowFile penalize(final FlowFile flowFile) {
+         validateState(flowFile);
+         final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
+         mockFlowFile.setPenalized();
+         return mockFlowFile;
+     }
+ 
+     public byte[] getContentAsByteArray(final MockFlowFile flowFile) {
+         validateState(flowFile);
+         return flowFile.getData();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
----------------------------------------------------------------------
diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
index 0000000,96bef71..13a87de
mode 000000,100644..100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
@@@ -1,0 -1,91 +1,72 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.util;
+ 
 -import java.util.Collections;
 -import java.util.HashSet;
 -import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ import org.apache.nifi.processor.Processor;
 -import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+ 
+ public class SharedSessionState {
+ 
+     private final MockFlowFileQueue flowFileQueue;
+     private final ProvenanceReporter provenanceReporter;
++    @SuppressWarnings("unused")
+     private final Processor processor;
+     private final AtomicLong flowFileIdGenerator;
+     private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
+ 
 -    private volatile Set<Relationship> unavailableRelationships;
+ 
+     public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) {
+         flowFileQueue = new MockFlowFileQueue();
+         provenanceReporter = new MockProvenanceReporter();
 -        unavailableRelationships = new HashSet<>();
+         this.flowFileIdGenerator = flowFileIdGenerator;
+         this.processor = processor;
+     }
+ 
 -    public Set<Relationship> getAvailableRelationships() {
 -        final Set<Relationship> relationships = new HashSet<>(processor.getRelationships());
 -        relationships.removeAll(unavailableRelationships);
 -        return relationships;
 -    }
 -
 -    public void setUnavailableRelationships(final Set<Relationship> relationships) {
 -        this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
 -    }
 -
 -    public Set<Relationship> getUnavailableRelationships() {
 -        return unavailableRelationships;
 -    }
 -
+     public MockFlowFileQueue getFlowFileQueue() {
+         return flowFileQueue;
+     }
+ 
+     public ProvenanceReporter getProvenanceReporter() {
+         return provenanceReporter;
+     }
+ 
+     public long nextFlowFileId() {
+         return flowFileIdGenerator.getAndIncrement();
+     }
+ 
+     public void adjustCounter(final String name, final long delta) {
+         AtomicLong counter = counterMap.get(name);
+         if (counter == null) {
+             counter = new AtomicLong(0L);
+             AtomicLong existingCounter = counterMap.putIfAbsent(name, counter);
+             if (existingCounter != null) {
+                 counter = existingCounter;
+             }
+         }
+ 
+         counter.addAndGet(delta);
+     }
+ 
+     public Long getCounterValue(final String name) {
+         final AtomicLong counterValue = counterMap.get(name);
+         return (counterValue == null) ? null : counterValue.get();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 0000000,54b611d..40d5035
mode 000000,100644..100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@@ -1,0 -1,492 +1,492 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.util;
+ 
+ import static java.util.Objects.requireNonNull;
+ 
+ import java.io.ByteArrayInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.lang.reflect.InvocationTargetException;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ import org.apache.nifi.components.AllowableValue;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.annotation.OnConfigured;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.Processor;
+ import org.apache.nifi.processor.QueueSize;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.annotation.OnAdded;
+ import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.processor.annotation.OnShutdown;
+ import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.annotation.OnUnscheduled;
+ import org.apache.nifi.processor.annotation.TriggerSerially;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+ import org.apache.nifi.reporting.InitializationException;
+ 
+ import org.junit.Assert;
+ 
+ public class StandardProcessorTestRunner implements TestRunner {
+ 
+     private final Processor processor;
+     private final MockProcessContext context;
+     private final MockFlowFileQueue flowFileQueue;
+     private final MockSessionFactory sessionFactory;
+     private final SharedSessionState sharedState;
+     private final AtomicLong idGenerator;
+     private final boolean triggerSerially;
+ 
+     private int numThreads = 1;
+     private final AtomicInteger invocations = new AtomicInteger(0);
+ 
+     StandardProcessorTestRunner(final Processor processor) {
+         this.processor = processor;
+         this.idGenerator = new AtomicLong(0L);
+         this.sharedState = new SharedSessionState(processor, idGenerator);
+         this.flowFileQueue = sharedState.getFlowFileQueue();
+         this.sessionFactory = new MockSessionFactory(sharedState);
+         this.context = new MockProcessContext(processor);
+ 
+         final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context);
+         processor.initialize(mockInitContext);
+ 
+         try {
+             ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
+         } catch (Exception e) {
+             Assert.fail("Could not invoke methods annotated with @OnAdded annotation due to: " + e);
+         }
+ 
+         triggerSerially = null != processor.getClass().getAnnotation(TriggerSerially.class);
+     }
+ 
+     @Override
+     public void setValidateExpressionUsage(final boolean validate) {
+         context.setValidateExpressionUsage(validate);
+     }
+ 
+     @Override
+     public Processor getProcessor() {
+         return processor;
+     }
+ 
+     @Override
+     public MockProcessContext getProcessContext() {
+         return context;
+     }
+ 
+     @Override
+     public void run() {
+         run(1);
+     }
+ 
+     @Override
+     public void run(int iterations) {
+         run(iterations, true);
+     }
+ 
+     @Override
+     public void run(final int iterations, final boolean stopOnFinish) {
+         run(iterations, stopOnFinish, true);
+     }
+     
+     @Override
+     public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) {
+         if (iterations < 1) {
+             throw new IllegalArgumentException();
+         }
+ 
+         context.assertValid();
+         context.enableExpressionValidation();
+         try {
+             if ( initialize ) {
+                 try {
+                     ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context);
+                 } catch (Exception e) {
+                     e.printStackTrace();
+                     Assert.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e);
+                 }
+             }
+ 
+             final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+             @SuppressWarnings("unchecked")
+             final Future<Throwable>[] futures = new Future[iterations];
+             for (int i = 0; i < iterations; i++) {
+                 final Future<Throwable> future = executorService.submit(new RunProcessor());
+                 futures[i] = future;
+             }
+ 
+             executorService.shutdown();
+ 
+             int finishedCount = 0;
+             boolean unscheduledRun = false;
+             for (final Future<Throwable> future : futures) {
+                 try {
+                     final Throwable thrown = future.get();   // wait for the result
+                     if (thrown != null) {
+                         throw new AssertionError(thrown);
+                     }
+ 
+                     if (++finishedCount == 1) {
+                         unscheduledRun = true;
+                         try {
+                             ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
+                         } catch (Exception e) {
+                             Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
+                         }
+                     }
+                 } catch (final Exception e) {
+                 }
+             }
+ 
+             if (!unscheduledRun) {
+                 try {
+                     ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
+                 } catch (Exception e) {
+                     Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
+                 }
+             }
+ 
+             if (stopOnFinish) {
+                 try {
+                     ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor);
+                 } catch (Exception e) {
+                     Assert.fail("Could not invoke methods annotated with @OnStopped annotation due to: " + e);
+                 }
+             }
+         } finally {
+             context.disableExpressionValidation();
+         }
+     }
+ 
+     @Override
+     public void shutdown() {
+         try {
+             ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, processor);
+         } catch (Exception e) {
+             Assert.fail("Could not invoke methods annotated with @OnShutdown annotation due to: " + e);
+         }
+     }
+ 
+     private class RunProcessor implements Callable<Throwable> {
+ 
+         @Override
+         public Throwable call() throws Exception {
+             invocations.incrementAndGet();
+             try {
+                 processor.onTrigger(context, sessionFactory);
+             } catch (final Throwable t) {
+                 return t;
+             }
+ 
+             return null;
+         }
+     }
+ 
+     @Override
+     public ProcessSessionFactory getProcessSessionFactory() {
+         return sessionFactory;
+     }
+ 
+     @Override
+     public void assertAllFlowFilesTransferred(final String relationship) {
+         for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+             session.assertAllFlowFilesTransferred(relationship);
+         }
+     }
+ 
+     @Override
+     public void assertAllFlowFilesTransferred(final Relationship relationship) {
+         for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+             session.assertAllFlowFilesTransferred(relationship);
+         }
+     }
+ 
+     @Override
+     public void assertAllFlowFilesTransferred(final String relationship, final int count) {
+         assertAllFlowFilesTransferred(relationship);
+         assertTransferCount(relationship, count);
+     }
+ 
+     @Override
+     public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) {
+         assertAllFlowFilesTransferred(relationship);
+         assertTransferCount(relationship, count);
+     }
+ 
+     @Override
+     public void assertTransferCount(final Relationship relationship, final int count) {
+         Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size());
+     }
+ 
+     @Override
+     public void assertTransferCount(final String relationship, final int count) {
+         Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size());
+     }
+ 
+     @Override
+     public void assertValid() {
+         context.assertValid();
+     }
+ 
+     @Override
+     public void assertNotValid() {
+         Assert.assertFalse("Processor appears to be valid but expected it to be invalid", context.isValid());
+     }
+ 
+     @Override
+     public boolean isQueueEmpty() {
+         return flowFileQueue.isEmpty();
+     }
+ 
+     @Override
+     public void assertQueueEmpty() {
+         Assert.assertTrue(flowFileQueue.isEmpty());
+     }
+ 
+     @Override
+     public void assertQueueNotEmpty() {
+         Assert.assertFalse(flowFileQueue.isEmpty());
+     }
+ 
+     @Override
+     public void clearTransferState() {
+         for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+             session.clearTransferState();
+         }
+     }
+ 
+     @Override
+     public void enqueue(final FlowFile... flowFiles) {
+         for (final FlowFile flowFile : flowFiles) {
+             flowFileQueue.offer((MockFlowFile) flowFile);
+         }
+     }
+ 
+     @Override
+     public void enqueue(final Path path) throws IOException {
+         enqueue(path, new HashMap<String, String>());
+     }
+ 
+     @Override
+     public void enqueue(final Path path, final Map<String, String> attributes) throws IOException {
+         final Map<String, String> modifiedAttributes = new HashMap<>(attributes);
+         if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) {
+             modifiedAttributes.put(CoreAttributes.FILENAME.key(), path.toFile().getName());
+         }
+         try (final InputStream in = Files.newInputStream(path)) {
+             enqueue(in, modifiedAttributes);
+         }
+     }
+ 
+     @Override
+     public void enqueue(final byte[] data) {
+         enqueue(data, new HashMap<String, String>());
+     }
+ 
+     @Override
+     public void enqueue(final byte[] data, final Map<String, String> attributes) {
+         enqueue(new ByteArrayInputStream(data), attributes);
+     }
+ 
+     @Override
+     public void enqueue(final InputStream data) {
+         enqueue(data, new HashMap<String, String>());
+     }
+ 
+     @Override
+     public void enqueue(final InputStream data, final Map<String, String> attributes) {
+         final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator));
+         MockFlowFile flowFile = session.create();
+         flowFile = session.importFrom(data, flowFile);
+         flowFile = session.putAllAttributes(flowFile, attributes);
+         enqueue(flowFile);
+     }
+ 
+     @Override
+     public byte[] getContentAsByteArray(final MockFlowFile flowFile) {
+         return flowFile.getData();
+     }
+ 
+     @Override
+     public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) {
+         final Relationship rel = new Relationship.Builder().name(relationship).build();
+         return getFlowFilesForRelationship(rel);
+     }
+ 
+     @Override
+     public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) {
+         final List<MockFlowFile> flowFiles = new ArrayList<>();
+         for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+             flowFiles.addAll(session.getFlowFilesForRelationship(relationship));
+         }
+ 
+         Collections.sort(flowFiles, new Comparator<MockFlowFile>() {
+             @Override
+             public int compare(final MockFlowFile o1, final MockFlowFile o2) {
+                 return Long.compare(o1.getCreationTime(), o2.getCreationTime());
+             }
+         });
+ 
+         return flowFiles;
+     }
+ 
+     @Override
+     public ProvenanceReporter getProvenanceReporter() {
+         return sharedState.getProvenanceReporter();
+     }
+ 
+     @Override
+     public QueueSize getQueueSize() {
+         return flowFileQueue.size();
+     }
+ 
+     @Override
+     public Long getCounterValue(final String name) {
+         return sharedState.getCounterValue(name);
+     }
+ 
+     @Override
+     public int getRemovedCount() {
+         int count = 0;
+         for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+             count += session.getRemovedCount();
+         }
+ 
+         return count;
+     }
+ 
+     @Override
+     public void setAnnotationData(final String annotationData) {
+         context.setAnnotationData(annotationData);
+     }
+ 
+     @Override
+     public ValidationResult setProperty(final String propertyName, final String propertyValue) {
+         return context.setProperty(propertyName, propertyValue);
+     }
+ 
+     @Override
+     public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
+         return context.setProperty(descriptor, value);
+     }
+ 
+     @Override
+     public ValidationResult setProperty(final PropertyDescriptor descriptor, final AllowableValue value) {
+         return context.setProperty(descriptor, value.getValue());
+     }
+ 
+     @Override
+     public void setThreadCount(final int threadCount) {
+         if (threadCount > 1 && triggerSerially) {
+             Assert.fail("Cannot set thread-count higher than 1 because the processor is triggered serially");
+         }
+ 
+         this.numThreads = threadCount;
+     }
+ 
+     @Override
+     public int getThreadCount() {
+         return numThreads;
+     }
+ 
+     @Override
+     public void setRelationshipAvailable(final Relationship relationship) {
 -        final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships());
++        final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships());
+         unavailable.remove(relationship);
 -        sharedState.setUnavailableRelationships(unavailable);
++        context.setUnavailableRelationships(unavailable);
+     }
+ 
+     @Override
+     public void setRelationshipAvailable(final String relationshipName) {
+         setRelationshipAvailable(new Relationship.Builder().name(relationshipName).build());
+     }
+ 
+     @Override
+     public void setRelationshipUnavailable(final Relationship relationship) {
 -        final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships());
++        final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships());
+         unavailable.add(relationship);
 -        sharedState.setUnavailableRelationships(unavailable);
++        context.setUnavailableRelationships(unavailable);
+     }
+ 
+     @Override
+     public void setRelationshipUnavailable(final String relationshipName) {
+         setRelationshipUnavailable(new Relationship.Builder().name(relationshipName).build());
+     }
+ 
+     @Override
+     public void addControllerService(final String identifier, final ControllerService service) throws InitializationException {
+         addControllerService(identifier, service, new HashMap<String, String>());
+     }
+ 
+     @Override
+     public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException {
+         final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier));
+         service.initialize(initContext);
+ 
+         final Map<PropertyDescriptor, String> resolvedProps = new HashMap<>();
+         for (final Map.Entry<String, String> entry : properties.entrySet()) {
+             resolvedProps.put(service.getPropertyDescriptor(entry.getKey()), entry.getValue());
+         }
+ 
+         final MockConfigurationContext configurationContext = new MockConfigurationContext(resolvedProps, context);
+         try {
+             ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, service, configurationContext);
+         } catch (final InvocationTargetException | IllegalAccessException | IllegalArgumentException e) {
+             throw new InitializationException(e);
+         }
+ 
+         context.addControllerService(identifier, service, resolvedProps, null);
+     }
+ 
+     @Override
+     public ControllerService getControllerService(final String identifier) {
+         return context.getControllerService(identifier);
+     }
+ 
+     @Override
+     public <T extends ControllerService> T getControllerService(final String identifier, final Class<T> serviceType) {
+         final ControllerService service = context.getControllerService(identifier);
+         return serviceType.cast(service);
+     }
+ 
+     @Override
+     public boolean removeProperty(PropertyDescriptor descriptor) {
+         return context.removeProperty(descriptor);
+     }
+ 
+ }