You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/16 17:57:47 UTC
[2/8] incubator-nifi git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 0000000,6e5f65d..15591d7
mode 000000,100644..100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@@ -1,0 -1,261 +1,284 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.util;
+
+ import static java.util.Objects.requireNonNull;
+
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
++import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
++import java.util.Set;
+
+ import org.apache.nifi.components.ConfigurableComponent;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ControllerServiceLookup;
++import org.apache.nifi.processor.Processor;
++import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.SchedulingContext;
+ import org.junit.Assert;
+
+ public class MockProcessContext extends MockControllerServiceLookup implements SchedulingContext, ControllerServiceLookup {
+
+ private final ConfigurableComponent component;
+ private final Map<PropertyDescriptor, String> properties = new HashMap<>();
+
+ private String annotationData = null;
+ private boolean yieldCalled = false;
+ private boolean enableExpressionValidation = false;
+ private boolean allowExpressionValidation = true;
+
++ private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
++
+ /**
+ * Creates a new MockProcessContext for the given Processor
+ *
+ * @param component
+ */
+ public MockProcessContext(final ConfigurableComponent component) {
+ this.component = Objects.requireNonNull(component);
+ }
+
+ public MockProcessContext(final ControllerService component, final MockProcessContext context) {
+ this(component);
+
+ try {
+ annotationData = context.getControllerServiceAnnotationData(component);
+ final Map<PropertyDescriptor, String> props = context.getControllerServiceProperties(component);
+ properties.putAll(props);
+ } catch (IllegalArgumentException e) {
+ // do nothing...the service is being loaded
+ }
+ }
+
+ @Override
+ public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+ return getProperty(descriptor.getName());
+ }
+
+ @Override
+ public PropertyValue getProperty(final String propertyName) {
+ final PropertyDescriptor descriptor = component.getPropertyDescriptor(propertyName);
+ if (descriptor == null) {
+ return null;
+ }
+
+ final String setPropertyValue = properties.get(descriptor);
+ final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
+ return new MockPropertyValue(propValue, this, (enableExpressionValidation && allowExpressionValidation) ? descriptor : null);
+ }
+
+ @Override
+ public PropertyValue newPropertyValue(final String rawValue) {
+ return new MockPropertyValue(rawValue, this);
+ }
+
+ public ValidationResult setProperty(final String propertyName, final String propertyValue) {
+ return setProperty(new PropertyDescriptor.Builder().name(propertyName).build(), propertyValue);
+ }
+
+ /**
+ * Updates the value of the property with the given PropertyDescriptor to
+ * the specified value IF and ONLY IF the value is valid according to the
+ * descriptor's validator. Otherwise, the property value is not updated. In
+ * either case, the ValidationResult is returned, indicating whether or not
+ * the property is valid
+ *
+ * @param descriptor
+ * @param value
+ * @return
+ */
+ public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
+ requireNonNull(descriptor);
+ requireNonNull(value, "Cannot set property to null value; if the intent is to remove the property, call removeProperty instead");
+ final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName());
+
+ final ValidationResult result = fullyPopulatedDescriptor.validate(value, new MockValidationContext(this));
+ String oldValue = properties.put(fullyPopulatedDescriptor, value);
+ if (oldValue == null) {
+ oldValue = fullyPopulatedDescriptor.getDefaultValue();
+ }
+ if ((value == null && oldValue != null) || (value != null && !value.equals(oldValue))) {
+ component.onPropertyModified(fullyPopulatedDescriptor, oldValue, value);
+ }
+
+ return result;
+ }
+
+ public boolean removeProperty(final PropertyDescriptor descriptor) {
+ Objects.requireNonNull(descriptor);
+ final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName());
+ String value = null;
+ if (!fullyPopulatedDescriptor.isRequired() && (value = properties.remove(fullyPopulatedDescriptor)) != null) {
+ component.onPropertyModified(fullyPopulatedDescriptor, value, null);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void yield() {
+ yieldCalled = true;
+ }
+
+ public boolean isYieldCalled() {
+ return yieldCalled;
+ }
+
+ public void addControllerService(final String serviceIdentifier, final ControllerService controllerService, final Map<PropertyDescriptor, String> properties, final String annotationData) {
+ requireNonNull(controllerService);
+ final ControllerServiceConfiguration config = addControllerService(controllerService);
+ config.setProperties(properties);
+ config.setAnnotationData(annotationData);
+ }
+
+ @Override
+ public int getMaxConcurrentTasks() {
+ return 1;
+ }
+
+ public void setAnnotationData(final String annotationData) {
+ this.annotationData = annotationData;
+ }
+
+ @Override
+ public String getAnnotationData() {
+ return annotationData;
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
+ if (supported == null || supported.isEmpty()) {
+ return Collections.unmodifiableMap(properties);
+ } else {
+ final Map<PropertyDescriptor, String> props = new LinkedHashMap<>();
+ for (final PropertyDescriptor descriptor : supported) {
+ props.put(descriptor, null);
+ }
+ props.putAll(properties);
+ return props;
+ }
+ }
+
+ /**
+ * Validates the current properties, returning ValidationResults for any
+ * invalid properties. All processor defined properties will be validated.
+ * If they are not included in the in the purposed configuration, the
+ * default value will be used.
+ *
+ * @return Collection of validation result objects for any invalid findings
+ * only. If the collection is empty then the processor is valid. Guaranteed
+ * non-null
+ */
+ public Collection<ValidationResult> validate() {
+ return component.validate(new MockValidationContext(this));
+ }
+
+ public boolean isValid() {
+ for (final ValidationResult result : validate()) {
+ if (!result.isValid()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void assertValid() {
+ final StringBuilder sb = new StringBuilder();
+ int failureCount = 0;
+
+ for (final ValidationResult result : validate()) {
+ if (!result.isValid()) {
+ sb.append(result.toString()).append("\n");
+ failureCount++;
+ }
+ }
+
+ if (failureCount > 0) {
+ Assert.fail("Processor has " + failureCount + " validation failures:\n" + sb.toString());
+ }
+ }
+
+ @Override
+ public String encrypt(final String unencrypted) {
+ return "enc{" + unencrypted + "}";
+ }
+
+ @Override
+ public String decrypt(final String encrypted) {
+ if (encrypted.startsWith("enc{") && encrypted.endsWith("}")) {
+ return encrypted.substring(4, encrypted.length() - 2);
+ }
+ return encrypted;
+ }
+
+ public void setValidateExpressionUsage(final boolean validate) {
+ allowExpressionValidation = validate;
+ }
+
+ public void enableExpressionValidation() {
+ enableExpressionValidation = true;
+ }
+
+ public void disableExpressionValidation() {
+ enableExpressionValidation = false;
+ }
+
+ Map<PropertyDescriptor, String> getControllerServiceProperties(final ControllerService controllerService) {
+ return super.getConfiguration(controllerService.getIdentifier()).getProperties();
+ }
+
+ String getControllerServiceAnnotationData(final ControllerService controllerService) {
+ return super.getConfiguration(controllerService.getIdentifier()).getAnnotationData();
+ }
+
+ @Override
+ public ControllerServiceLookup getControllerServiceLookup() {
+ return this;
+ }
+
+ @Override
+ public void leaseControllerService(final String identifier) {
+ }
+
++ public Set<Relationship> getAvailableRelationships() {
++ if ( !(component instanceof Processor) ) {
++ return Collections.emptySet();
++ }
++
++ final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships());
++ relationships.removeAll(unavailableRelationships);
++ return relationships;
++ }
++
++ public void setUnavailableRelationships(final Set<Relationship> relationships) {
++ this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
++ }
++
++ public Set<Relationship> getUnavailableRelationships() {
++ return unavailableRelationships;
++ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 0000000,552780c..ea55b34
mode 000000,100644..100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@@ -1,0 -1,1010 +1,1006 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.util;
+
+ import java.io.ByteArrayInputStream;
+ import java.io.ByteArrayOutputStream;
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Files;
+ import java.nio.file.OpenOption;
+ import java.nio.file.Path;
+ import java.nio.file.StandardOpenOption;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.regex.Pattern;
+
+ import org.junit.Assert;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.FlowFileFilter;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.QueueSize;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processor.exception.FlowFileHandlingException;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+
+ public class MockProcessSession implements ProcessSession {
+
+ private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
+ private final MockFlowFileQueue processorQueue;
+ private final Set<Long> beingProcessed = new HashSet<>();
+
+ private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
+ private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
+ private final SharedSessionState sharedState;
+ private final Map<String, Long> counterMap = new HashMap<>();
+
+ private boolean committed = false;
+ private boolean rolledback = false;
+ private int removedCount = 0;
+
+ public MockProcessSession(final SharedSessionState sharedState) {
+ this.sharedState = sharedState;
+ this.processorQueue = sharedState.getFlowFileQueue();
+ }
+
+ @Override
+ public void adjustCounter(final String name, final long delta, final boolean immediate) {
+ if (immediate) {
+ sharedState.adjustCounter(name, delta);
+ return;
+ }
+
+ Long counter = counterMap.get(name);
+ if (counter == null) {
+ counter = delta;
+ counterMap.put(name, counter);
+ return;
+ }
+
+ counter = counter + delta;
+ counterMap.put(name, counter);
+ }
+
+ @Override
+ public MockFlowFile clone(final FlowFile flowFile) {
+ validateState(flowFile);
+ final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+ beingProcessed.add(newFlowFile.getId());
+ return newFlowFile;
+ }
+
+ @Override
+ public MockFlowFile clone(final FlowFile flowFile, final long offset, final long size) {
+ validateState(flowFile);
+ if (offset + size > flowFile.getSize()) {
+ throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString());
+ }
+
+ final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
+ final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size));
+ newFlowFile.setData(newContent);
+
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+ beingProcessed.add(newFlowFile.getId());
+ return newFlowFile;
+ }
+
+ @Override
+ public void commit() {
+ if (!beingProcessed.isEmpty()) {
+ throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed);
+ }
+ committed = true;
+ beingProcessed.clear();
+ currentVersions.clear();
+ originalVersions.clear();
+
+ for (final Map.Entry<String, Long> entry : counterMap.entrySet()) {
+ sharedState.adjustCounter(entry.getKey(), entry.getValue());
+ }
+
+ counterMap.clear();
+ }
+
+ /**
+ * Clear the 'committed' flag so that we can test that the next iteration of
+ * {@link nifi.processor.Processor#onTrigger} commits or rolls back the
+ * session
+ */
+ public void clearCommited() {
+ committed = false;
+ }
+
+ /**
+ * Clear the 'rolledBack' flag so that we can test that the next iteration
+ * of {@link nifi.processor.Processor#onTrigger} commits or rolls back the
+ * session
+ */
+ public void clearRollback() {
+ rolledback = false;
+ }
+
+ @Override
+ public MockFlowFile create() {
+ final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId());
+ currentVersions.put(flowFile.getId(), flowFile);
+ beingProcessed.add(flowFile.getId());
+ return flowFile;
+ }
+
+ @Override
+ public MockFlowFile create(final FlowFile flowFile) {
+ MockFlowFile newFlowFile = create();
+ newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+ beingProcessed.add(newFlowFile.getId());
+ return newFlowFile;
+ }
+
+ @Override
+ public MockFlowFile create(final Collection<FlowFile> flowFiles) {
+ MockFlowFile newFlowFile = create();
+ newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+ beingProcessed.add(newFlowFile.getId());
+ return newFlowFile;
+ }
+
+ @Override
+ public void exportTo(final FlowFile flowFile, final OutputStream out) {
+ validateState(flowFile);
+ if (flowFile == null || out == null) {
+ throw new IllegalArgumentException("arguments cannot be null");
+ }
+
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ try {
+ out.write(mock.getData());
+ } catch (IOException e) {
+ throw new FlowFileAccessException(e.toString(), e);
+ }
+ }
+
+ @Override
+ public void exportTo(final FlowFile flowFile, final Path path, final boolean append) {
+ validateState(flowFile);
+ if (flowFile == null || path == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ final OpenOption mode = append ? StandardOpenOption.APPEND : StandardOpenOption.CREATE;
+
+ try (final OutputStream out = Files.newOutputStream(path, mode)) {
+ out.write(mock.getData());
+ } catch (final IOException e) {
+ throw new FlowFileAccessException(e.toString(), e);
+ }
+ }
+
+ @Override
+ public MockFlowFile get() {
+ final MockFlowFile flowFile = processorQueue.poll();
+ if (flowFile != null) {
+ beingProcessed.add(flowFile.getId());
+ currentVersions.put(flowFile.getId(), flowFile);
+ originalVersions.put(flowFile.getId(), flowFile);
+ }
+ return flowFile;
+ }
+
+ @Override
+ public List<FlowFile> get(final int maxResults) {
+ final List<FlowFile> flowFiles = new ArrayList<>(Math.min(500, maxResults));
+ for (int i = 0; i < maxResults; i++) {
+ final MockFlowFile nextFlowFile = get();
+ if (nextFlowFile == null) {
+ return flowFiles;
+ }
+
+ flowFiles.add(nextFlowFile);
+ }
+
+ return flowFiles;
+ }
+
+ @Override
+ public List<FlowFile> get(final FlowFileFilter filter) {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ final List<MockFlowFile> unselected = new ArrayList<>();
+
+ while (true) {
+ final MockFlowFile flowFile = processorQueue.poll();
+ if (flowFile == null) {
+ break;
+ }
+
+ final FlowFileFilter.FlowFileFilterResult filterResult = filter.filter(flowFile);
+ if (filterResult.isAccept()) {
+ flowFiles.add(flowFile);
+
+ beingProcessed.add(flowFile.getId());
+ currentVersions.put(flowFile.getId(), flowFile);
+ originalVersions.put(flowFile.getId(), flowFile);
+ } else {
+ unselected.add(flowFile);
+ }
+
+ if (!filterResult.isContinue()) {
+ break;
+ }
+ }
+
+ processorQueue.addAll(unselected);
+ return flowFiles;
+ }
+
+ @Override
+ public QueueSize getQueueSize() {
+ return processorQueue.size();
+ }
+
+ @Override
+ public MockFlowFile importFrom(final InputStream in, final FlowFile flowFile) {
+ validateState(flowFile);
+ if (in == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+ try {
+ final byte[] data = readFully(in);
+ newFlowFile.setData(data);
+ return newFlowFile;
+ } catch (final IOException e) {
+ throw new FlowFileAccessException(e.toString(), e);
+ }
+ }
+
+ @Override
+ public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, final FlowFile flowFile) {
+ validateState(flowFile);
+ if (path == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+ MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ Files.copy(path, baos);
+ } catch (final IOException e) {
+ throw new FlowFileAccessException(e.toString(), e);
+ }
+
+ newFlowFile.setData(baos.toByteArray());
+ newFlowFile = putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), path.getFileName().toString());
+ return newFlowFile;
+ }
+
- @Override
- public Set<Relationship> getAvailableRelationships() {
- return sharedState.getAvailableRelationships();
- }
+
+ @Override
+ public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) {
+ for (final FlowFile source : sources) {
+ validateState(source);
+ }
+ validateState(destination);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ for (final FlowFile flowFile : sources) {
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+ final byte[] data = mock.getData();
+ try {
+ baos.write(data);
+ } catch (final IOException e) {
+ throw new AssertionError("Failed to write to BAOS");
+ }
+ }
+
+ final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination);
+ newFlowFile.setData(baos.toByteArray());
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ return newFlowFile;
+ }
+
+ @Override
+ public MockFlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attrs) {
+ validateState(flowFile);
+ if (attrs == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+ final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ newFlowFile.putAttributes(attrs);
+ return newFlowFile;
+ }
+
+ @Override
+ public MockFlowFile putAttribute(final FlowFile flowFile, final String attrName, final String attrValue) {
+ validateState(flowFile);
+ if (attrName == null || attrValue == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create");
+ }
+
+ if ("uuid".equals(attrName)) {
+ Assert.fail("Should not be attempting to set FlowFile UUID via putAttribute. This will be ignored in production");
+ }
+
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+ final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put(attrName, attrValue);
+ newFlowFile.putAttributes(attrs);
+ return newFlowFile;
+ }
+
+ @Override
+ public void read(final FlowFile flowFile, final InputStreamCallback callback) {
+ if (callback == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+
+ validateState(flowFile);
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
+ try {
+ callback.process(bais);
+ } catch (IOException e) {
+ throw new ProcessException(e.toString(), e);
+ }
+ }
+
+ @Override
+ public void remove(final FlowFile flowFile) {
+ validateState(flowFile);
+ final Iterator<Long> itr = beingProcessed.iterator();
+ while (itr.hasNext()) {
+ final Long ffId = itr.next();
+ if (ffId != null && ffId.equals(flowFile.getId())) {
+ itr.remove();
+ beingProcessed.remove(ffId);
+ removedCount++;
+ currentVersions.remove(ffId);
+ return;
+ }
+ }
+
+ throw new ProcessException(flowFile + " not found in queue");
+ }
+
+ @Override
+ public void remove(final Collection<FlowFile> flowFiles) {
+ for (final FlowFile flowFile : flowFiles) {
+ validateState(flowFile);
+ }
+
+ for (final FlowFile flowFile : flowFiles) {
+ remove(flowFile);
+ }
+ }
+
+ @Override
+ public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> attrNames) {
+ validateState(flowFile);
+ if (attrNames == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ newFlowFile.removeAttributes(attrNames);
+ return newFlowFile;
+ }
+
+ @Override
+ public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) {
+ validateState(flowFile);
+ if (flowFile == null) {
+ throw new IllegalArgumentException("flowFile cannot be null");
+ }
+ if (keyPattern == null) {
+ return (MockFlowFile) flowFile;
+ }
+
+ final Set<String> attrsToRemove = new HashSet<>();
+ for (final String key : flowFile.getAttributes().keySet()) {
+ if (keyPattern.matcher(key).matches()) {
+ attrsToRemove.add(key);
+ }
+ }
+
+ return removeAllAttributes(flowFile, attrsToRemove);
+ }
+
+ @Override
+ public MockFlowFile removeAttribute(final FlowFile flowFile, final String attrName) {
+ validateState(flowFile);
+ if (attrName == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+ final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ final Set<String> attrNames = new HashSet<>();
+ attrNames.add(attrName);
+ newFlowFile.removeAttributes(attrNames);
+ return newFlowFile;
+ }
+
+ @Override
+ public void rollback() {
+ rollback(false);
+ }
+
+ @Override
+ public void rollback(final boolean penalize) {
+ for (final List<MockFlowFile> list : transferMap.values()) {
+ for (final MockFlowFile flowFile : list) {
+ processorQueue.offer(flowFile);
+ }
+ }
+
+ for (final Long flowFileId : beingProcessed) {
+ final MockFlowFile flowFile = originalVersions.get(flowFileId);
+ if (flowFile != null) {
+ processorQueue.offer(flowFile);
+ }
+ }
+
+ rolledback = true;
+ beingProcessed.clear();
+ currentVersions.clear();
+ originalVersions.clear();
+ transferMap.clear();
+ clearTransferState();
+ }
+
+ @Override
+ public void transfer(final FlowFile flowFile) {
+ validateState(flowFile);
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("I only accept MockFlowFile");
+ }
+
+ beingProcessed.remove(flowFile.getId());
+ processorQueue.offer((MockFlowFile) flowFile);
+ }
+
+ @Override
+ public void transfer(final Collection<FlowFile> flowFiles) {
+ for (final FlowFile flowFile : flowFiles) {
+ transfer(flowFile);
+ }
+ }
+
+ @Override
+ public void transfer(final FlowFile flowFile, final Relationship relationship) {
+ validateState(flowFile);
+ List<MockFlowFile> list = transferMap.get(relationship);
+ if (list == null) {
+ list = new ArrayList<>();
+ transferMap.put(relationship, list);
+ }
+
+ beingProcessed.remove(flowFile.getId());
+ list.add((MockFlowFile) flowFile);
+ }
+
+ @Override
+ public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) {
+ for (final FlowFile flowFile : flowFiles) {
+ validateState(flowFile);
+ }
+
+ List<MockFlowFile> list = transferMap.get(relationship);
+ if (list == null) {
+ list = new ArrayList<>();
+ transferMap.put(relationship, list);
+ }
+
+ for (final FlowFile flowFile : flowFiles) {
+ beingProcessed.remove(flowFile.getId());
+ list.add((MockFlowFile) flowFile);
+ }
+ }
+
+ @Override
+ public MockFlowFile write(final FlowFile flowFile, final OutputStreamCallback callback) {
+ validateState(flowFile);
+ if (callback == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ callback.process(baos);
+ } catch (final IOException e) {
+ throw new ProcessException(e.toString(), e);
+ }
+
+ final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ newFlowFile.setData(baos.toByteArray());
+ return newFlowFile;
+ }
+
+ @Override
+ public FlowFile append(final FlowFile flowFile, final OutputStreamCallback callback) {
+ validateState(flowFile);
+ if (callback == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ baos.write(mock.getData());
+ callback.process(baos);
+ } catch (final IOException e) {
+ throw new ProcessException(e.toString(), e);
+ }
+
+ final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ newFlowFile.setData(baos.toByteArray());
+ return newFlowFile;
+ }
+
+ @Override
+ public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) {
+ validateState(flowFile);
+ if (callback == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ if (!(flowFile instanceof MockFlowFile)) {
+ throw new IllegalArgumentException("Cannot export a flow file that I did not create");
+ }
+ final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData());
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ callback.process(in, out);
+ } catch (final IOException e) {
+ throw new ProcessException(e.toString(), e);
+ }
+
+ final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+ newFlowFile.setData(out.toByteArray());
+
+ return newFlowFile;
+ }
+
+ private byte[] readFully(final InputStream in) throws IOException {
+ final byte[] buffer = new byte[4096];
+ int len;
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ while ((len = in.read(buffer)) >= 0) {
+ baos.write(buffer, 0, len);
+ }
+
+ return baos.toByteArray();
+ }
+
+ public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) {
+ List<MockFlowFile> list = this.transferMap.get(relationship);
+ if (list == null) {
+ list = new ArrayList<>();
+ }
+
+ return list;
+ }
+
+ /**
+ * Returns a List of FlowFiles in the order in which they were transferred
+ * to the given relationship
+ *
+ * @param relationship
+ * @return
+ */
+ public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) {
+ final Relationship procRel = new Relationship.Builder().name(relationship).build();
+ return getFlowFilesForRelationship(procRel);
+ }
+
+ public MockFlowFile createFlowFile(final File file) throws IOException {
+ return createFlowFile(Files.readAllBytes(file.toPath()));
+ }
+
+ public MockFlowFile createFlowFile(final byte[] data) {
+ final MockFlowFile flowFile = create();
+ flowFile.setData(data);
+ return flowFile;
+ }
+
+ public MockFlowFile createFlowFile(final byte[] data, final Map<String, String> attrs) {
+ final MockFlowFile ff = createFlowFile(data);
+ ff.putAttributes(attrs);
+ return ff;
+ }
+
+ @Override
+ public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) {
+ for (final FlowFile flowFile : sources) {
+ validateState(flowFile);
+ }
+ validateState(destination);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ if (header != null) {
+ baos.write(header);
+ }
+
+ int count = 0;
+ for (final FlowFile flowFile : sources) {
+ baos.write(((MockFlowFile) flowFile).getData());
+ if (demarcator != null && ++count != sources.size()) {
+ baos.write(demarcator);
+ }
+ }
+
+ if (footer != null) {
+ baos.write(footer);
+ }
+ } catch (final IOException e) {
+ throw new AssertionError("failed to write data to BAOS");
+ }
+
+ final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination);
+ newFlowFile.setData(baos.toByteArray());
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ return newFlowFile;
+ }
+
+ private void validateState(final FlowFile flowFile) {
+ Objects.requireNonNull(flowFile);
+ final FlowFile currentVersion = currentVersions.get(flowFile.getId());
+ if (currentVersion == null) {
+ throw new FlowFileHandlingException(flowFile + " is not known in this session");
+ }
+
+ if (currentVersion != flowFile) {
+ throw new FlowFileHandlingException(flowFile + " is not the most recent version of this flow file within this session");
+ }
+
+ for (final List<MockFlowFile> flowFiles : transferMap.values()) {
+ if (flowFiles.contains(flowFile)) {
+ throw new IllegalStateException(flowFile + " has already been transferred");
+ }
+ }
+ }
+
+ /**
+ * Inherits the attributes from the given source flow file into another flow
+ * file. The UUID of the source becomes the parent UUID of this flow file.
+ * If a parent uuid had previously been established it will be replaced by
+ * the uuid of the given source
+ *
+ * @param source the FlowFile from which to copy attributes
+ * @param destination the FlowFile to which to copy attributes
+ */
+ private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) {
+ if (source == null || destination == null || source == destination) {
+ return destination; //don't need to inherit from ourselves
+ }
+ FlowFile updated = putAllAttributes(destination, source.getAttributes());
+ getProvenanceReporter().fork(source, Collections.singletonList(updated));
+ return updated;
+ }
+
+ /**
+ * Inherits the attributes from the given source flow files into the
+ * destination flow file. The UUIDs of the sources becomes the parent UUIDs
+ * of the destination flow file. Only attributes which is common to all
+ * source items is copied into this flow files attributes. Any previously
+ * established parent UUIDs will be replaced by the UUIDs of the sources. It
+ * will capture the uuid of a certain number of source objects and may not
+ * capture all of them. How many it will capture is unspecified.
+ *
+ * @param sources
+ */
+ private FlowFile inheritAttributes(final Collection<FlowFile> sources, final FlowFile destination) {
+ final String uuid = destination.getAttribute(CoreAttributes.UUID.key());
+ final StringBuilder parentUuidBuilder = new StringBuilder();
+ int uuidsCaptured = 0;
+ for (final FlowFile source : sources) {
+ if (source == destination) {
+ continue; //don't want to capture parent uuid of this. Something can't be a child of itself
+ }
+ final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key());
+ if (sourceUuid != null && !sourceUuid.trim().isEmpty()) {
+ uuidsCaptured++;
+ if (parentUuidBuilder.length() > 0) {
+ parentUuidBuilder.append(",");
+ }
+ parentUuidBuilder.append(sourceUuid);
+ }
+
+ if (uuidsCaptured > 100) {
+ break;
+ }
+ }
+
+ FlowFile updated = putAllAttributes(destination, intersectAttributes(sources));
+ getProvenanceReporter().join(sources, updated);
+ return updated;
+ }
+
+ /**
+ * Returns the attributes that are common to every flow file given. The key
+ * and value must match exactly.
+ *
+ * @param flowFileList a list of flow files
+ *
+ * @return the common attributes
+ */
+ private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
+ final Map<String, String> result = new HashMap<>();
+ //trivial cases
+ if (flowFileList == null || flowFileList.isEmpty()) {
+ return result;
+ } else if (flowFileList.size() == 1) {
+ result.putAll(flowFileList.iterator().next().getAttributes());
+ }
+
+ /*
+ * Start with the first attribute map and only put an entry to the
+ * resultant map if it is common to every map.
+ */
+ final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
+
+ outer:
+ for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+ final String key = mapEntry.getKey();
+ final String value = mapEntry.getValue();
+ for (final FlowFile flowFile : flowFileList) {
+ final Map<String, String> currMap = flowFile.getAttributes();
+ final String curVal = currMap.get(key);
+ if (curVal == null || !curVal.equals(value)) {
+ continue outer;
+ }
+ }
+ result.put(key, value);
+ }
+
+ return result;
+ }
+
+ /**
+ * Assert that {@link #commit()} has been called
+ */
+ public void assertCommitted() {
+ Assert.assertTrue("Session was not committed", committed);
+ }
+
+ /**
+ * Assert that {@link #commit()} has not been called
+ */
+ public void assertNotCommitted() {
+ Assert.assertFalse("Session was committed", committed);
+ }
+
+ /**
+ * Assert that {@link #rollback()} has been called
+ */
+ public void assertRolledBack() {
+ Assert.assertTrue("Session was not rolled back", rolledback);
+ }
+
+ /**
+ * Assert that {@link #rollback()} has not been called
+ */
+ public void assertNotRolledBack() {
+ Assert.assertFalse("Session was rolled back", rolledback);
+ }
+
+ /**
+ * Assert that the number of FlowFiles transferred to the given relationship
+ * is equal to the given count
+ *
+ * @param relationship
+ * @param count
+ */
+ public void assertTransferCount(final Relationship relationship, final int count) {
+ final int transferCount = getFlowFilesForRelationship(relationship).size();
+ Assert.assertEquals("Expected " + count + " FlowFiles to be transferred to "
+ + relationship + " but actual transfer count was " + transferCount, count, transferCount);
+ }
+
+ /**
+ * Assert that the number of FlowFiles transferred to the given relationship
+ * is equal to the given count
+ *
+ * @param relationship
+ * @param count
+ */
+ public void assertTransferCount(final String relationship, final int count) {
+ assertTransferCount(new Relationship.Builder().name(relationship).build(), count);
+ }
+
+ /**
+ * Assert that there are no FlowFiles left on the input queue.
+ */
+ public void assertQueueEmpty() {
+ Assert.assertTrue("FlowFile Queue has " + this.processorQueue.size() + " items", this.processorQueue.isEmpty());
+ }
+
+ /**
+ * Assert that at least one FlowFile is on the input queue
+ */
+ public void assertQueueNotEmpty() {
+ Assert.assertFalse("FlowFile Queue is empty", this.processorQueue.isEmpty());
+ }
+
+ /**
+ * Asserts that all FlowFiles that were transferred were transferred to the
+ * given relationship
+ *
+ * @param relationship
+ */
+ public void assertAllFlowFilesTransferred(final String relationship) {
+ assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build());
+ }
+
+ /**
+ * Asserts that all FlowFiles that were transferred were transferred to the
+ * given relationship
+ *
+ * @param relationship
+ */
+ public void assertAllFlowFilesTransferred(final Relationship relationship) {
+ for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
+ final Relationship rel = entry.getKey();
+ final List<MockFlowFile> flowFiles = entry.getValue();
+
+ if (!rel.equals(relationship) && flowFiles != null && !flowFiles.isEmpty()) {
+ Assert.fail("Expected all Transferred FlowFiles to go to " + relationship + " but " + flowFiles.size() + " were routed to " + rel);
+ }
+ }
+ }
+
+ /**
+ * Removes all state information about FlowFiles that have been transferred
+ */
+ public void clearTransferState() {
+ this.transferMap.clear();
+ }
+
+ /**
+ * Asserts that all FlowFiles that were transferred were transferred to the
+ * given relationship and that the number of FlowFiles transferred is equal
+ * to <code>count</code>
+ *
+ * @param relationship
+ * @param count
+ */
+ public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) {
+ assertAllFlowFilesTransferred(relationship);
+ assertTransferCount(relationship, count);
+ }
+
+ /**
+ * Asserts that all FlowFiles that were transferred were transferred to the
+ * given relationship and that the number of FlowFiles transferred is equal
+ * to <code>count</code>
+ *
+ * @param relationship
+ * @param count
+ */
+ public void assertAllFlowFilesTransferred(final String relationship, final int count) {
+ assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build(), count);
+ }
+
+ /**
+ * Returns the number of FlowFiles that were removed
+ *
+ * @return
+ */
+ public int getRemovedCount() {
+ return removedCount;
+ }
+
+ @Override
+ public ProvenanceReporter getProvenanceReporter() {
+ return sharedState.getProvenanceReporter();
+ }
+
+ @Override
+ public MockFlowFile penalize(final FlowFile flowFile) {
+ validateState(flowFile);
+ final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
+ mockFlowFile.setPenalized();
+ return mockFlowFile;
+ }
+
+ public byte[] getContentAsByteArray(final MockFlowFile flowFile) {
+ validateState(flowFile);
+ return flowFile.getData();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
----------------------------------------------------------------------
diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
index 0000000,96bef71..13a87de
mode 000000,100644..100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
@@@ -1,0 -1,91 +1,72 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.util;
+
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.atomic.AtomicLong;
+
+ import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+
+ public class SharedSessionState {
+
+ private final MockFlowFileQueue flowFileQueue;
+ private final ProvenanceReporter provenanceReporter;
++ @SuppressWarnings("unused")
+ private final Processor processor;
+ private final AtomicLong flowFileIdGenerator;
+ private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
+
- private volatile Set<Relationship> unavailableRelationships;
+
+ public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) {
+ flowFileQueue = new MockFlowFileQueue();
+ provenanceReporter = new MockProvenanceReporter();
- unavailableRelationships = new HashSet<>();
+ this.flowFileIdGenerator = flowFileIdGenerator;
+ this.processor = processor;
+ }
+
- public Set<Relationship> getAvailableRelationships() {
- final Set<Relationship> relationships = new HashSet<>(processor.getRelationships());
- relationships.removeAll(unavailableRelationships);
- return relationships;
- }
-
- public void setUnavailableRelationships(final Set<Relationship> relationships) {
- this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
- }
-
- public Set<Relationship> getUnavailableRelationships() {
- return unavailableRelationships;
- }
-
+ public MockFlowFileQueue getFlowFileQueue() {
+ return flowFileQueue;
+ }
+
+ public ProvenanceReporter getProvenanceReporter() {
+ return provenanceReporter;
+ }
+
+ public long nextFlowFileId() {
+ return flowFileIdGenerator.getAndIncrement();
+ }
+
+ public void adjustCounter(final String name, final long delta) {
+ AtomicLong counter = counterMap.get(name);
+ if (counter == null) {
+ counter = new AtomicLong(0L);
+ AtomicLong existingCounter = counterMap.putIfAbsent(name, counter);
+ if (existingCounter != null) {
+ counter = existingCounter;
+ }
+ }
+
+ counter.addAndGet(delta);
+ }
+
+ public Long getCounterValue(final String name) {
+ final AtomicLong counterValue = counterMap.get(name);
+ return (counterValue == null) ? null : counterValue.get();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 0000000,54b611d..40d5035
mode 000000,100644..100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@@ -1,0 -1,492 +1,492 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.util;
+
+ import static java.util.Objects.requireNonNull;
+
+ import java.io.ByteArrayInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.lang.reflect.InvocationTargetException;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+
+ import org.apache.nifi.components.AllowableValue;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.annotation.OnConfigured;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.Processor;
+ import org.apache.nifi.processor.QueueSize;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.annotation.OnAdded;
+ import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.processor.annotation.OnShutdown;
+ import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.annotation.OnUnscheduled;
+ import org.apache.nifi.processor.annotation.TriggerSerially;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+ import org.apache.nifi.reporting.InitializationException;
+
+ import org.junit.Assert;
+
+ public class StandardProcessorTestRunner implements TestRunner {
+
+ private final Processor processor;
+ private final MockProcessContext context;
+ private final MockFlowFileQueue flowFileQueue;
+ private final MockSessionFactory sessionFactory;
+ private final SharedSessionState sharedState;
+ private final AtomicLong idGenerator;
+ private final boolean triggerSerially;
+
+ private int numThreads = 1;
+ private final AtomicInteger invocations = new AtomicInteger(0);
+
+ StandardProcessorTestRunner(final Processor processor) {
+ this.processor = processor;
+ this.idGenerator = new AtomicLong(0L);
+ this.sharedState = new SharedSessionState(processor, idGenerator);
+ this.flowFileQueue = sharedState.getFlowFileQueue();
+ this.sessionFactory = new MockSessionFactory(sharedState);
+ this.context = new MockProcessContext(processor);
+
+ final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context);
+ processor.initialize(mockInitContext);
+
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
+ } catch (Exception e) {
+ Assert.fail("Could not invoke methods annotated with @OnAdded annotation due to: " + e);
+ }
+
+ triggerSerially = null != processor.getClass().getAnnotation(TriggerSerially.class);
+ }
+
+ @Override
+ public void setValidateExpressionUsage(final boolean validate) {
+ context.setValidateExpressionUsage(validate);
+ }
+
+ @Override
+ public Processor getProcessor() {
+ return processor;
+ }
+
+ @Override
+ public MockProcessContext getProcessContext() {
+ return context;
+ }
+
+ @Override
+ public void run() {
+ run(1);
+ }
+
+ @Override
+ public void run(int iterations) {
+ run(iterations, true);
+ }
+
+ @Override
+ public void run(final int iterations, final boolean stopOnFinish) {
+ run(iterations, stopOnFinish, true);
+ }
+
+ @Override
+ public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) {
+ if (iterations < 1) {
+ throw new IllegalArgumentException();
+ }
+
+ context.assertValid();
+ context.enableExpressionValidation();
+ try {
+ if ( initialize ) {
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e);
+ }
+ }
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ @SuppressWarnings("unchecked")
+ final Future<Throwable>[] futures = new Future[iterations];
+ for (int i = 0; i < iterations; i++) {
+ final Future<Throwable> future = executorService.submit(new RunProcessor());
+ futures[i] = future;
+ }
+
+ executorService.shutdown();
+
+ int finishedCount = 0;
+ boolean unscheduledRun = false;
+ for (final Future<Throwable> future : futures) {
+ try {
+ final Throwable thrown = future.get(); // wait for the result
+ if (thrown != null) {
+ throw new AssertionError(thrown);
+ }
+
+ if (++finishedCount == 1) {
+ unscheduledRun = true;
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
+ } catch (Exception e) {
+ Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
+ }
+ }
+ } catch (final Exception e) {
+ }
+ }
+
+ if (!unscheduledRun) {
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
+ } catch (Exception e) {
+ Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
+ }
+ }
+
+ if (stopOnFinish) {
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor);
+ } catch (Exception e) {
+ Assert.fail("Could not invoke methods annotated with @OnStopped annotation due to: " + e);
+ }
+ }
+ } finally {
+ context.disableExpressionValidation();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, processor);
+ } catch (Exception e) {
+ Assert.fail("Could not invoke methods annotated with @OnShutdown annotation due to: " + e);
+ }
+ }
+
+ private class RunProcessor implements Callable<Throwable> {
+
+ @Override
+ public Throwable call() throws Exception {
+ invocations.incrementAndGet();
+ try {
+ processor.onTrigger(context, sessionFactory);
+ } catch (final Throwable t) {
+ return t;
+ }
+
+ return null;
+ }
+ }
+
+ @Override
+ public ProcessSessionFactory getProcessSessionFactory() {
+ return sessionFactory;
+ }
+
+ @Override
+ public void assertAllFlowFilesTransferred(final String relationship) {
+ for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+ session.assertAllFlowFilesTransferred(relationship);
+ }
+ }
+
+ @Override
+ public void assertAllFlowFilesTransferred(final Relationship relationship) {
+ for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+ session.assertAllFlowFilesTransferred(relationship);
+ }
+ }
+
+ @Override
+ public void assertAllFlowFilesTransferred(final String relationship, final int count) {
+ assertAllFlowFilesTransferred(relationship);
+ assertTransferCount(relationship, count);
+ }
+
+ @Override
+ public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) {
+ assertAllFlowFilesTransferred(relationship);
+ assertTransferCount(relationship, count);
+ }
+
+ @Override
+ public void assertTransferCount(final Relationship relationship, final int count) {
+ Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size());
+ }
+
+ @Override
+ public void assertTransferCount(final String relationship, final int count) {
+ Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size());
+ }
+
+ @Override
+ public void assertValid() {
+ context.assertValid();
+ }
+
+ @Override
+ public void assertNotValid() {
+ Assert.assertFalse("Processor appears to be valid but expected it to be invalid", context.isValid());
+ }
+
+ @Override
+ public boolean isQueueEmpty() {
+ return flowFileQueue.isEmpty();
+ }
+
+ @Override
+ public void assertQueueEmpty() {
+ Assert.assertTrue(flowFileQueue.isEmpty());
+ }
+
+ @Override
+ public void assertQueueNotEmpty() {
+ Assert.assertFalse(flowFileQueue.isEmpty());
+ }
+
+ @Override
+ public void clearTransferState() {
+ for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+ session.clearTransferState();
+ }
+ }
+
+ @Override
+ public void enqueue(final FlowFile... flowFiles) {
+ for (final FlowFile flowFile : flowFiles) {
+ flowFileQueue.offer((MockFlowFile) flowFile);
+ }
+ }
+
+ @Override
+ public void enqueue(final Path path) throws IOException {
+ enqueue(path, new HashMap<String, String>());
+ }
+
+ @Override
+ public void enqueue(final Path path, final Map<String, String> attributes) throws IOException {
+ final Map<String, String> modifiedAttributes = new HashMap<>(attributes);
+ if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) {
+ modifiedAttributes.put(CoreAttributes.FILENAME.key(), path.toFile().getName());
+ }
+ try (final InputStream in = Files.newInputStream(path)) {
+ enqueue(in, modifiedAttributes);
+ }
+ }
+
+ @Override
+ public void enqueue(final byte[] data) {
+ enqueue(data, new HashMap<String, String>());
+ }
+
+ @Override
+ public void enqueue(final byte[] data, final Map<String, String> attributes) {
+ enqueue(new ByteArrayInputStream(data), attributes);
+ }
+
+ @Override
+ public void enqueue(final InputStream data) {
+ enqueue(data, new HashMap<String, String>());
+ }
+
+ @Override
+ public void enqueue(final InputStream data, final Map<String, String> attributes) {
+ final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator));
+ MockFlowFile flowFile = session.create();
+ flowFile = session.importFrom(data, flowFile);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ enqueue(flowFile);
+ }
+
+ @Override
+ public byte[] getContentAsByteArray(final MockFlowFile flowFile) {
+ return flowFile.getData();
+ }
+
+ @Override
+ public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) {
+ final Relationship rel = new Relationship.Builder().name(relationship).build();
+ return getFlowFilesForRelationship(rel);
+ }
+
+ @Override
+ public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) {
+ final List<MockFlowFile> flowFiles = new ArrayList<>();
+ for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+ flowFiles.addAll(session.getFlowFilesForRelationship(relationship));
+ }
+
+ Collections.sort(flowFiles, new Comparator<MockFlowFile>() {
+ @Override
+ public int compare(final MockFlowFile o1, final MockFlowFile o2) {
+ return Long.compare(o1.getCreationTime(), o2.getCreationTime());
+ }
+ });
+
+ return flowFiles;
+ }
+
+ @Override
+ public ProvenanceReporter getProvenanceReporter() {
+ return sharedState.getProvenanceReporter();
+ }
+
+ @Override
+ public QueueSize getQueueSize() {
+ return flowFileQueue.size();
+ }
+
+ @Override
+ public Long getCounterValue(final String name) {
+ return sharedState.getCounterValue(name);
+ }
+
+ @Override
+ public int getRemovedCount() {
+ int count = 0;
+ for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+ count += session.getRemovedCount();
+ }
+
+ return count;
+ }
+
+ @Override
+ public void setAnnotationData(final String annotationData) {
+ context.setAnnotationData(annotationData);
+ }
+
+ @Override
+ public ValidationResult setProperty(final String propertyName, final String propertyValue) {
+ return context.setProperty(propertyName, propertyValue);
+ }
+
+ @Override
+ public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
+ return context.setProperty(descriptor, value);
+ }
+
+ @Override
+ public ValidationResult setProperty(final PropertyDescriptor descriptor, final AllowableValue value) {
+ return context.setProperty(descriptor, value.getValue());
+ }
+
+ @Override
+ public void setThreadCount(final int threadCount) {
+ if (threadCount > 1 && triggerSerially) {
+ Assert.fail("Cannot set thread-count higher than 1 because the processor is triggered serially");
+ }
+
+ this.numThreads = threadCount;
+ }
+
+ @Override
+ public int getThreadCount() {
+ return numThreads;
+ }
+
+ @Override
+ public void setRelationshipAvailable(final Relationship relationship) {
- final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships());
++ final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships());
+ unavailable.remove(relationship);
- sharedState.setUnavailableRelationships(unavailable);
++ context.setUnavailableRelationships(unavailable);
+ }
+
+ @Override
+ public void setRelationshipAvailable(final String relationshipName) {
+ setRelationshipAvailable(new Relationship.Builder().name(relationshipName).build());
+ }
+
+ @Override
+ public void setRelationshipUnavailable(final Relationship relationship) {
- final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships());
++ final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships());
+ unavailable.add(relationship);
- sharedState.setUnavailableRelationships(unavailable);
++ context.setUnavailableRelationships(unavailable);
+ }
+
+ @Override
+ public void setRelationshipUnavailable(final String relationshipName) {
+ setRelationshipUnavailable(new Relationship.Builder().name(relationshipName).build());
+ }
+
+ @Override
+ public void addControllerService(final String identifier, final ControllerService service) throws InitializationException {
+ addControllerService(identifier, service, new HashMap<String, String>());
+ }
+
+ @Override
+ public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException {
+ final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier));
+ service.initialize(initContext);
+
+ final Map<PropertyDescriptor, String> resolvedProps = new HashMap<>();
+ for (final Map.Entry<String, String> entry : properties.entrySet()) {
+ resolvedProps.put(service.getPropertyDescriptor(entry.getKey()), entry.getValue());
+ }
+
+ final MockConfigurationContext configurationContext = new MockConfigurationContext(resolvedProps, context);
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, service, configurationContext);
+ } catch (final InvocationTargetException | IllegalAccessException | IllegalArgumentException e) {
+ throw new InitializationException(e);
+ }
+
+ context.addControllerService(identifier, service, resolvedProps, null);
+ }
+
+ @Override
+ public ControllerService getControllerService(final String identifier) {
+ return context.getControllerService(identifier);
+ }
+
+ @Override
+ public <T extends ControllerService> T getControllerService(final String identifier, final Class<T> serviceType) {
+ final ControllerService service = context.getControllerService(identifier);
+ return serviceType.cast(service);
+ }
+
+ @Override
+ public boolean removeProperty(PropertyDescriptor descriptor) {
+ return context.removeProperty(descriptor);
+ }
+
+ }