You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/21 07:48:56 UTC

[48/51] [partial] incubator-nifi git commit: NIFI-270 Made all changes identified by adam, mark, joey to prep for a cleaner build

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
deleted file mode 100644
index afb56e8..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.nifi.provenance.lineage.ComputeLineageResult;
-import org.apache.nifi.provenance.lineage.EdgeNode;
-import org.apache.nifi.provenance.lineage.EventNode;
-import org.apache.nifi.provenance.lineage.FlowFileNode;
-import org.apache.nifi.provenance.lineage.LineageEdge;
-import org.apache.nifi.provenance.lineage.LineageNode;
-
-/**
- *
- */
-public class StandardLineageResult implements ComputeLineageResult {
-
-    public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES);
-    private static final Logger logger = LoggerFactory.getLogger(StandardLineageResult.class);
-
-    private final Collection<String> flowFileUuids;
-    private final Collection<ProvenanceEventRecord> relevantRecords = new ArrayList<>();
-    private final Set<LineageNode> nodes = new HashSet<>();
-    private final Set<LineageEdge> edges = new HashSet<>();
-    private final int numSteps;
-    private final long creationNanos;
-    private long computationNanos;
-
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock readLock = rwLock.readLock();
-    private final Lock writeLock = rwLock.writeLock();
-
-    private Date expirationDate = null;
-    private String error = null;
-    private int numCompletedSteps = 0;
-
-    private volatile boolean canceled = false;
-
-    public StandardLineageResult(final int numSteps, final Collection<String> flowFileUuids) {
-        this.numSteps = numSteps;
-        this.creationNanos = System.nanoTime();
-        this.flowFileUuids = flowFileUuids;
-
-        updateExpiration();
-    }
-
-    @Override
-    public List<LineageNode> getNodes() {
-        readLock.lock();
-        try {
-            return new ArrayList<>(nodes);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public List<LineageEdge> getEdges() {
-        readLock.lock();
-        try {
-            return new ArrayList<>(edges);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public int getNumberOfEdges() {
-        readLock.lock();
-        try {
-            return edges.size();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public int getNumberOfNodes() {
-        readLock.lock();
-        try {
-            return nodes.size();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public long getComputationTime(final TimeUnit timeUnit) {
-        readLock.lock();
-        try {
-            return timeUnit.convert(computationNanos, TimeUnit.NANOSECONDS);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public Date getExpiration() {
-        readLock.lock();
-        try {
-            return expirationDate;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public String getError() {
-        readLock.lock();
-        try {
-            return error;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public int getPercentComplete() {
-        readLock.lock();
-        try {
-            return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public boolean isFinished() {
-        readLock.lock();
-        try {
-            return numCompletedSteps >= numSteps || canceled;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public void setError(final String error) {
-        writeLock.lock();
-        try {
-            this.error = error;
-            numCompletedSteps++;
-
-            updateExpiration();
-
-            if (numCompletedSteps >= numSteps) {
-                computationNanos = System.nanoTime() - creationNanos;
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public void update(final Collection<ProvenanceEventRecord> records) {
-        writeLock.lock();
-        try {
-            relevantRecords.addAll(records);
-
-            numCompletedSteps++;
-            updateExpiration();
-
-            if (numCompletedSteps >= numSteps && error == null) {
-                computeLineage();
-                computationNanos = System.nanoTime() - creationNanos;
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    /**
-     * Computes the lineage from the relevant Provenance Event Records. This
-     * method must be called with the write lock held and is only going to be
-     * useful after all of the records have been successfully obtained
-     */
-    private void computeLineage() {
-        final long startNanos = System.nanoTime();
-
-        nodes.clear();
-        edges.clear();
-
-        Map<String, LineageNode> lastEventMap = new HashMap<>();    // maps FlowFile UUID to last event for that FlowFile
-        final List<ProvenanceEventRecord> sortedRecords = new ArrayList<>(relevantRecords);
-        Collections.sort(sortedRecords, new Comparator<ProvenanceEventRecord>() {
-            @Override
-            public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) {
-                // Sort on Event Time, then Event ID.
-                final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime());
-                if (eventTimeComparison == 0) {
-                    return Long.compare(o1.getEventId(), o2.getEventId());
-                } else {
-                    return eventTimeComparison;
-                }
-            }
-        });
-
-        // convert the StandardProvenanceRecord objects into Lineage nodes (FlowFileNode, EventNodes).
-        for (final ProvenanceEventRecord record : sortedRecords) {
-            final LineageNode lineageNode = new EventNode(record);
-            final boolean added = nodes.add(lineageNode);
-            if (!added) {
-                logger.debug("Did not add {} because it already exists in the 'nodes' set", lineageNode);
-            }
-
-            // Create an edge that connects this node to the previous node for the same FlowFile UUID.
-            final LineageNode lastNode = lastEventMap.get(record.getFlowFileUuid());
-            if (lastNode != null) {
-                // We calculate the Edge UUID based on whether or not this event is a SPAWN.
-                // If this event is a SPAWN, then we want to use the previous node's UUID because a
-                // SPAWN Event's UUID is not necessarily what we want, since a SPAWN Event's UUID pertains to
-                // only one of (potentially) many UUIDs associated with the event. Otherwise, we know that
-                // the UUID of this record is appropriate, so we just use it.
-                final String edgeUuid;
-
-                switch (record.getEventType()) {
-                    case JOIN:
-                    case CLONE:
-                    case REPLAY:
-                        edgeUuid = lastNode.getFlowFileUuid();
-                        break;
-                    default:
-                        edgeUuid = record.getFlowFileUuid();
-                        break;
-                }
-
-                edges.add(new EdgeNode(edgeUuid, lastNode, lineageNode));
-            }
-
-            lastEventMap.put(record.getFlowFileUuid(), lineageNode);
-
-            switch (record.getEventType()) {
-                case FORK:
-                case JOIN:
-                case REPLAY:
-                case CLONE: {
-                    // For events that create FlowFile nodes, we need to create the FlowFile Nodes and associated Edges, as appropriate
-                    for (final String childUuid : record.getChildUuids()) {
-                        if (flowFileUuids.contains(childUuid)) {
-                            final FlowFileNode childNode = new FlowFileNode(childUuid, record.getEventTime());
-                            final boolean isNewFlowFile = nodes.add(childNode);
-                            if (!isNewFlowFile) {
-                                final String msg = "Unable to generate Lineage Graph because multiple events were registered claiming to have generated the same FlowFile (UUID = " + childNode.getFlowFileUuid() + ")";
-                                logger.error(msg);
-                                setError(msg);
-                                return;
-                            }
-
-                            edges.add(new EdgeNode(childNode.getFlowFileUuid(), lineageNode, childNode));
-                            lastEventMap.put(childUuid, childNode);
-                        }
-                    }
-                    for (final String parentUuid : record.getParentUuids()) {
-                        LineageNode lastNodeForParent = lastEventMap.get(parentUuid);
-                        if (lastNodeForParent != null && !lastNodeForParent.equals(lineageNode)) {
-                            edges.add(new EdgeNode(parentUuid, lastNodeForParent, lineageNode));
-                        }
-
-                        lastEventMap.put(parentUuid, lineageNode);
-                    }
-                }
-                break;
-                case RECEIVE:
-                case CREATE: {
-                        // for a receive event, we want to create a FlowFile Node that represents the FlowFile received
-                    // and create an edge from the Receive Event to the FlowFile Node
-                    final LineageNode flowFileNode = new FlowFileNode(record.getFlowFileUuid(), record.getEventTime());
-                    final boolean isNewFlowFile = nodes.add(flowFileNode);
-                    if (!isNewFlowFile) {
-                        final String msg = "Found cycle in graph. This indicates that multiple events were registered claiming to have generated the same FlowFile (UUID = " + flowFileNode.getFlowFileUuid() + ")";
-                        setError(msg);
-                        logger.error(msg);
-                        return;
-                    }
-                    edges.add(new EdgeNode(record.getFlowFileUuid(), lineageNode, flowFileNode));
-                    lastEventMap.put(record.getFlowFileUuid(), flowFileNode);
-                }
-                break;
-                default:
-                    break;
-            }
-        }
-
-        final long nanos = System.nanoTime() - startNanos;
-        logger.debug("Finished building lineage with {} nodes and {} edges in {} millis", nodes.size(), edges.size(), TimeUnit.NANOSECONDS.toMillis(nanos));
-    }
-
-    void cancel() {
-        this.canceled = true;
-    }
-
-    /**
-     * Must be called with write lock!
-     */
-    private void updateExpiration() {
-        expirationDate = new Date(System.currentTimeMillis() + TTL);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
deleted file mode 100644
index cfbae88..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.Relationship;
-
-/**
- * Holder for provenance relevant information
- * <p/>
- * @author none
- */
-public final class StandardProvenanceEventRecord implements ProvenanceEventRecord {
-
-    private final long eventTime;
-    private final long entryDate;
-    private final ProvenanceEventType eventType;
-    private final long lineageStartDate;
-    private final Set<String> lineageIdentifiers;
-    private final String componentId;
-    private final String componentType;
-    private final String transitUri;
-    private final String sourceSystemFlowFileIdentifier;
-    private final String uuid;
-    private final List<String> parentUuids;
-    private final List<String> childrenUuids;
-    private final String alternateIdentifierUri;
-    private final String details;
-    private final String relationship;
-    private final long storageByteOffset;
-    private final String storageFilename;
-    private final long eventDuration;
-
-    private final String contentClaimSection;
-    private final String contentClaimContainer;
-    private final String contentClaimIdentifier;
-    private final Long contentClaimOffset;
-    private final long contentSize;
-
-    private final String previousClaimSection;
-    private final String previousClaimContainer;
-    private final String previousClaimIdentifier;
-    private final Long previousClaimOffset;
-    private final Long previousSize;
-
-    private final String sourceQueueIdentifier;
-
-    private final Map<String, String> previousAttributes;
-    private final Map<String, String> updatedAttributes;
-
-    private volatile long eventId;
-
-    private StandardProvenanceEventRecord(final Builder builder) {
-        this.eventTime = builder.eventTime;
-        this.entryDate = builder.entryDate;
-        this.eventType = builder.eventType;
-        this.componentId = builder.componentId;
-        this.componentType = builder.componentType;
-        this.transitUri = builder.transitUri;
-        this.sourceSystemFlowFileIdentifier = builder.sourceSystemFlowFileIdentifier;
-        this.uuid = builder.uuid;
-        this.parentUuids = builder.parentUuids;
-        this.childrenUuids = builder.childrenUuids;
-        this.alternateIdentifierUri = builder.alternateIdentifierUri;
-        this.details = builder.details;
-        this.relationship = builder.relationship;
-        this.storageByteOffset = builder.storageByteOffset;
-        this.storageFilename = builder.storageFilename;
-        this.eventDuration = builder.eventDuration;
-        this.lineageStartDate = builder.lineageStartDate;
-        this.lineageIdentifiers = Collections.unmodifiableSet(builder.lineageIdentifiers);
-
-        previousClaimSection = builder.previousClaimSection;
-        previousClaimContainer = builder.previousClaimContainer;
-        previousClaimIdentifier = builder.previousClaimIdentifier;
-        previousClaimOffset = builder.previousClaimOffset;
-        previousSize = builder.previousSize;
-
-        contentClaimSection = builder.contentClaimSection;
-        contentClaimContainer = builder.contentClaimContainer;
-        contentClaimIdentifier = builder.contentClaimIdentifier;
-        contentClaimOffset = builder.contentClaimOffset;
-        contentSize = builder.contentSize;
-
-        previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes);
-        updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
-
-        sourceQueueIdentifier = builder.sourceQueueIdentifier;
-
-    }
-
-    public String getStorageFilename() {
-        return storageFilename;
-    }
-
-    public long getStorageByteOffset() {
-        return storageByteOffset;
-    }
-
-    void setEventId(final long eventId) {
-        this.eventId = eventId;
-    }
-
-    @Override
-    public long getEventId() {
-        return eventId;
-    }
-
-    @Override
-    public long getEventTime() {
-        return eventTime;
-    }
-
-    @Override
-    public Set<String> getLineageIdentifiers() {
-        return lineageIdentifiers;
-    }
-
-    @Override
-    public long getLineageStartDate() {
-        return lineageStartDate;
-    }
-
-    @Override
-    public long getFileSize() {
-        return contentSize;
-    }
-
-    @Override
-    public Long getPreviousFileSize() {
-        return previousSize;
-    }
-
-    @Override
-    public ProvenanceEventType getEventType() {
-        return eventType;
-    }
-
-    @Override
-    public Map<String, String> getAttributes() {
-        final Map<String, String> allAttrs = new HashMap<>(previousAttributes.size() + updatedAttributes.size());
-        allAttrs.putAll(previousAttributes);
-        for (final Map.Entry<String, String> entry : updatedAttributes.entrySet()) {
-            if (entry.getValue() != null) {
-                allAttrs.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return allAttrs;
-    }
-
-    @Override
-    public String getComponentId() {
-        return componentId;
-    }
-
-    @Override
-    public String getComponentType() {
-        return componentType;
-    }
-
-    @Override
-    public String getTransitUri() {
-        return transitUri;
-    }
-
-    @Override
-    public String getSourceSystemFlowFileIdentifier() {
-        return sourceSystemFlowFileIdentifier;
-    }
-
-    @Override
-    public String getFlowFileUuid() {
-        return uuid;
-    }
-
-    @Override
-    public List<String> getParentUuids() {
-        return parentUuids == null ? Collections.<String>emptyList() : parentUuids;
-    }
-
-    @Override
-    public List<String> getChildUuids() {
-        return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids;
-    }
-
-    @Override
-    public String getAlternateIdentifierUri() {
-        return alternateIdentifierUri;
-    }
-
-    @Override
-    public long getEventDuration() {
-        return eventDuration;
-    }
-
-    @Override
-    public String getDetails() {
-        return details;
-    }
-
-    @Override
-    public String getRelationship() {
-        return relationship;
-    }
-
-    @Override
-    public long getFlowFileEntryDate() {
-        return entryDate;
-    }
-
-    @Override
-    public String getContentClaimSection() {
-        return contentClaimSection;
-    }
-
-    @Override
-    public String getContentClaimContainer() {
-        return contentClaimContainer;
-    }
-
-    @Override
-    public String getContentClaimIdentifier() {
-        return contentClaimIdentifier;
-    }
-
-    @Override
-    public Long getContentClaimOffset() {
-        return contentClaimOffset;
-    }
-
-    @Override
-    public String getSourceQueueIdentifier() {
-        return sourceQueueIdentifier;
-    }
-
-    @Override
-    public Map<String, String> getPreviousAttributes() {
-        return previousAttributes;
-    }
-
-    @Override
-    public String getPreviousContentClaimContainer() {
-        return previousClaimContainer;
-    }
-
-    @Override
-    public String getPreviousContentClaimIdentifier() {
-        return previousClaimIdentifier;
-    }
-
-    @Override
-    public Long getPreviousContentClaimOffset() {
-        return previousClaimOffset;
-    }
-
-    @Override
-    public String getPreviousContentClaimSection() {
-        return previousClaimSection;
-    }
-
-    @Override
-    public Map<String, String> getUpdatedAttributes() {
-        return updatedAttributes;
-    }
-
-    @Override
-    public int hashCode() {
-        final int eventTypeCode;
-        if (eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.FORK) {
-            eventTypeCode = 1472;
-        } else if (eventType == ProvenanceEventType.REPLAY) {
-            eventTypeCode = 21479 + (int) (0x7FFFFFFF & eventTime); // use lower bits of event time.
-        } else {
-            eventTypeCode = 4812 + eventType.hashCode() + 4 * uuid.hashCode();
-        }
-
-        return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
-                + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode;
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (obj == this) {
-            return true;
-        }
-        if (!(obj instanceof StandardProvenanceEventRecord)) {
-            return false;
-        }
-
-        final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
-        // If event ID's are populated and not equal, return false. If they have not yet been populated, do not
-        // use them in the comparison.
-        if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) {
-            return false;
-        }
-        if (eventType != other.eventType) {
-            return false;
-        }
-
-        if (!componentId.equals(other.componentId)) {
-            return false;
-        }
-
-        if (different(parentUuids, other.parentUuids)) {
-            return false;
-        }
-
-        if (different(childrenUuids, other.childrenUuids)) {
-            return false;
-        }
-
-        // SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child.
-        if (!uuid.equals(other.uuid)) {
-            return false;
-        }
-
-        if (different(transitUri, other.transitUri)) {
-            return false;
-        }
-
-        if (different(relationship, other.relationship)) {
-            return false;
-        }
-
-        return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime());
-    }
-
-    private boolean different(final Object a, final Object b) {
-        if (a == null && b == null) {
-            return false;
-        }
-        if (a == null || b == null) {
-            return true;
-        }
-
-        return !a.equals(b);
-    }
-
-    private boolean different(final List<String> a, final List<String> b) {
-        if (a == null && b == null) {
-            return false;
-        }
-
-        if (a == null && b != null) {
-            return true;
-        }
-
-        if (a != null && b == null) {
-            return true;
-        }
-
-        if (a.size() != b.size()) {
-            return true;
-        }
-
-        final List<String> sortedA = new ArrayList<>(a);
-        final List<String> sortedB = new ArrayList<>(b);
-
-        Collections.sort(sortedA);
-        Collections.sort(sortedB);
-
-        for (int i = 0; i < sortedA.size(); i++) {
-            if (!sortedA.get(i).equals(sortedB.get(i))) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    @Override
-    public String toString() {
-        return "ProvenanceEventRecord ["
-                + "eventId=" + eventId
-                + ", eventType=" + eventType
-                + ", eventTime=" + new Date(eventTime)
-                + ", uuid=" + uuid
-                + ", fileSize=" + contentSize
-                + ", componentId=" + componentId
-                + ", transitUri=" + transitUri
-                + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
-                + ", parentUuids=" + parentUuids
-                + ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
-    }
-
-    public static class Builder implements ProvenanceEventBuilder {
-
-        private long eventTime = System.currentTimeMillis();
-        private long entryDate;
-        private long lineageStartDate;
-        private Set<String> lineageIdentifiers = new HashSet<>();
-        private ProvenanceEventType eventType = null;
-        private String componentId = null;
-        private String componentType = null;
-        private String sourceSystemFlowFileIdentifier = null;
-        private String transitUri = null;
-        private String uuid = null;
-        private List<String> parentUuids = null;
-        private List<String> childrenUuids = null;
-        private String contentType = null;
-        private String alternateIdentifierUri = null;
-        private String details = null;
-        private String relationship = null;
-        private long storageByteOffset = -1L;
-        private long eventDuration = -1L;
-        private String storageFilename;
-
-        private String contentClaimSection;
-        private String contentClaimContainer;
-        private String contentClaimIdentifier;
-        private Long contentClaimOffset;
-        private Long contentSize;
-
-        private String previousClaimSection;
-        private String previousClaimContainer;
-        private String previousClaimIdentifier;
-        private Long previousClaimOffset;
-        private Long previousSize;
-
-        private String sourceQueueIdentifier;
-
-        private Map<String, String> previousAttributes;
-        private Map<String, String> updatedAttributes;
-
-        @Override
-        public Builder fromEvent(final ProvenanceEventRecord event) {
-            eventTime = event.getEventTime();
-            entryDate = event.getFlowFileEntryDate();
-            lineageStartDate = event.getLineageStartDate();
-            lineageIdentifiers = event.getLineageIdentifiers();
-            eventType = event.getEventType();
-            componentId = event.getComponentId();
-            componentType = event.getComponentType();
-            transitUri = event.getTransitUri();
-            sourceSystemFlowFileIdentifier = event.getSourceSystemFlowFileIdentifier();
-            uuid = event.getFlowFileUuid();
-            parentUuids = event.getParentUuids();
-            childrenUuids = event.getChildUuids();
-            alternateIdentifierUri = event.getAlternateIdentifierUri();
-            eventDuration = event.getEventDuration();
-            previousAttributes = event.getPreviousAttributes();
-            updatedAttributes = event.getUpdatedAttributes();
-            details = event.getDetails();
-            relationship = event.getRelationship();
-
-            contentClaimSection = event.getContentClaimSection();
-            contentClaimContainer = event.getContentClaimContainer();
-            contentClaimIdentifier = event.getContentClaimIdentifier();
-            contentClaimOffset = event.getContentClaimOffset();
-            contentSize = event.getFileSize();
-
-            previousClaimSection = event.getPreviousContentClaimSection();
-            previousClaimContainer = event.getPreviousContentClaimContainer();
-            previousClaimIdentifier = event.getPreviousContentClaimIdentifier();
-            previousClaimOffset = event.getPreviousContentClaimOffset();
-            previousSize = event.getPreviousFileSize();
-
-            sourceQueueIdentifier = event.getSourceQueueIdentifier();
-
-            if (event instanceof StandardProvenanceEventRecord) {
-                final StandardProvenanceEventRecord standardProvEvent = (StandardProvenanceEventRecord) event;
-                storageByteOffset = standardProvEvent.storageByteOffset;
-                storageFilename = standardProvEvent.storageFilename;
-            }
-
-            return this;
-        }
-
-        @Override
-        public Builder setFlowFileEntryDate(final long entryDate) {
-            this.entryDate = entryDate;
-            return this;
-        }
-
-        @Override
-        public Builder setLineageIdentifiers(final Set<String> lineageIdentifiers) {
-            this.lineageIdentifiers = lineageIdentifiers;
-            return this;
-        }
-
-        @Override
-        public Builder setAttributes(final Map<String, String> previousAttributes, final Map<String, String> updatedAttributes) {
-            this.previousAttributes = previousAttributes;
-            this.updatedAttributes = updatedAttributes;
-            return this;
-        }
-
-        @Override
-        public Builder setFlowFileUUID(final String uuid) {
-            this.uuid = uuid;
-            return this;
-        }
-
-        public Builder setStorageLocation(final String filename, final long offset) {
-            this.storageFilename = filename;
-            this.storageByteOffset = offset;
-            return this;
-        }
-
-        @Override
-        public Builder setEventTime(long eventTime) {
-            this.eventTime = eventTime;
-            return this;
-        }
-
-        @Override
-        public Builder setEventDuration(final long millis) {
-            this.eventDuration = millis;
-            return this;
-        }
-
-        @Override
-        public Builder setLineageStartDate(final long startDate) {
-            this.lineageStartDate = startDate;
-            return this;
-        }
-
-        public Builder addLineageIdentifier(final String lineageIdentifier) {
-            this.lineageIdentifiers.add(lineageIdentifier);
-            return this;
-        }
-
-        @Override
-        public Builder setEventType(ProvenanceEventType eventType) {
-            this.eventType = eventType;
-            return this;
-        }
-
-        @Override
-        public Builder setComponentId(String componentId) {
-            this.componentId = componentId;
-            return this;
-        }
-
-        @Override
-        public Builder setComponentType(String componentType) {
-            this.componentType = componentType;
-            return this;
-        }
-
-        @Override
-        public Builder setSourceSystemFlowFileIdentifier(String sourceSystemFlowFileIdentifier) {
-            this.sourceSystemFlowFileIdentifier = sourceSystemFlowFileIdentifier;
-            return this;
-        }
-
-        @Override
-        public Builder setTransitUri(String transitUri) {
-            this.transitUri = transitUri;
-            return this;
-        }
-
-        @Override
-        public Builder addParentFlowFile(final FlowFile parentFlowFile) {
-            if (this.parentUuids == null) {
-                this.parentUuids = new ArrayList<>();
-            }
-            this.parentUuids.add(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
-            return this;
-        }
-
-        public Builder addParentUuid(final String uuid) {
-            if (this.parentUuids == null) {
-                this.parentUuids = new ArrayList<>();
-            }
-            this.parentUuids.add(uuid);
-            return this;
-        }
-
-        @Override
-        public Builder removeParentFlowFile(final FlowFile parentFlowFile) {
-            if (this.parentUuids == null) {
-                return this;
-            }
-
-            parentUuids.remove(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
-            return this;
-        }
-
-        @Override
-        public Builder addChildFlowFile(final FlowFile childFlowFile) {
-            if (this.childrenUuids == null) {
-                this.childrenUuids = new ArrayList<>();
-            }
-            this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
-            return this;
-        }
-
-        public Builder addChildUuid(final String uuid) {
-            if (this.childrenUuids == null) {
-                this.childrenUuids = new ArrayList<>();
-            }
-            this.childrenUuids.add(uuid);
-            return this;
-        }
-
-        @Override
-        public Builder removeChildFlowFile(final FlowFile childFlowFile) {
-            if (this.childrenUuids == null) {
-                return this;
-            }
-
-            childrenUuids.remove(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
-            return this;
-        }
-
-        public Builder setContentType(String contentType) {
-            this.contentType = contentType;
-            return this;
-        }
-
-        @Override
-        public Builder setAlternateIdentifierUri(String alternateIdentifierUri) {
-            this.alternateIdentifierUri = alternateIdentifierUri;
-            return this;
-        }
-
-        @Override
-        public Builder setDetails(String details) {
-            this.details = details;
-            return this;
-        }
-
-        @Override
-        public Builder setRelationship(Relationship relationship) {
-            this.relationship = relationship.getName();
-            return this;
-        }
-
-        public Builder setRelationship(final String relationship) {
-            this.relationship = relationship;
-            return this;
-        }
-
-        @Override
-        public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) {
-            setFlowFileEntryDate(flowFile.getEntryDate());
-            setLineageIdentifiers(flowFile.getLineageIdentifiers());
-            setLineageStartDate(flowFile.getLineageStartDate());
-            setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes());
-            uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
-            this.contentSize = flowFile.getSize();
-            return this;
-        }
-
-        @Override
-        public Builder setPreviousContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) {
-            previousClaimSection = section;
-            previousClaimContainer = container;
-            previousClaimIdentifier = identifier;
-            previousClaimOffset = offset;
-            previousSize = size;
-            return this;
-        }
-
-        @Override
-        public Builder setCurrentContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) {
-            contentClaimSection = section;
-            contentClaimContainer = container;
-            contentClaimIdentifier = identifier;
-            contentClaimOffset = offset;
-            contentSize = size;
-            return this;
-        }
-
-        @Override
-        public Builder setSourceQueueIdentifier(final String identifier) {
-            sourceQueueIdentifier = identifier;
-            return this;
-        }
-
-        private void assertSet(final Object value, final String name) {
-            if (value == null) {
-                throw new IllegalStateException("Cannot create Provenance Event Record because " + name + " is not set");
-            }
-        }
-
-        public ProvenanceEventType getEventType() {
-            return eventType;
-        }
-
-        public List<String> getChildUuids() {
-            return Collections.unmodifiableList(childrenUuids);
-        }
-
-        public List<String> getParentUuids() {
-            return Collections.unmodifiableList(parentUuids);
-        }
-
-        @Override
-        public StandardProvenanceEventRecord build() {
-            assertSet(eventType, "Event Type");
-            assertSet(componentId, "Component ID");
-            assertSet(componentType, "Component Type");
-            assertSet(uuid, "FlowFile UUID");
-            assertSet(contentSize, "FlowFile Size");
-
-            switch (eventType) {
-                case ADDINFO:
-                    if (alternateIdentifierUri == null) {
-                        throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no alternate identifiers have been set");
-                    }
-                    break;
-                case RECEIVE:
-                case SEND:
-                    assertSet(transitUri, "Transit URI");
-                    break;
-                case ROUTE:
-                    assertSet(relationship, "Relationship");
-                    break;
-                case CLONE:
-                case FORK:
-                case JOIN:
-                    if ((parentUuids == null || parentUuids.isEmpty()) && (childrenUuids == null || childrenUuids.isEmpty())) {
-                        throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no Parent UUIDs or Children UUIDs have been set");
-                    }
-                    break;
-                default:
-                    break;
-            }
-
-            return new StandardProvenanceEventRecord(this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
deleted file mode 100644
index 9a9a27d..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.provenance.search.Query;
-import org.apache.nifi.provenance.search.QueryResult;
-
-public class StandardQueryResult implements QueryResult {
-
-    public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES);
-    private final Query query;
-    private final long creationNanos;
-
-    private final int numSteps;
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock readLock = rwLock.readLock();
-
-    private final Lock writeLock = rwLock.writeLock();
-    // guarded by writeLock
-    private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>();
-    private long totalHitCount;
-    private int numCompletedSteps = 0;
-    private Date expirationDate;
-    private String error;
-    private long queryTime;
-
-    private volatile boolean canceled = false;
-
-    public StandardQueryResult(final Query query, final int numSteps) {
-        this.query = query;
-        this.numSteps = numSteps;
-        this.creationNanos = System.nanoTime();
-
-        updateExpiration();
-    }
-
-    @Override
-    public List<ProvenanceEventRecord> getMatchingEvents() {
-        readLock.lock();
-        try {
-            if (matchingRecords.size() <= query.getMaxResults()) {
-                return new ArrayList<>(matchingRecords);
-            }
-
-            final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
-            for (int i = 0; i < query.getMaxResults(); i++) {
-                copy.add(matchingRecords.get(i));
-            }
-
-            return copy;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public long getTotalHitCount() {
-        readLock.lock();
-        try {
-            return totalHitCount;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public long getQueryTime() {
-        return queryTime;
-    }
-
-    @Override
-    public Date getExpiration() {
-        return expirationDate;
-    }
-
-    @Override
-    public String getError() {
-        return error;
-    }
-
-    @Override
-    public int getPercentComplete() {
-        readLock.lock();
-        try {
-            return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public boolean isFinished() {
-        readLock.lock();
-        try {
-            return numCompletedSteps >= numSteps || canceled;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    void cancel() {
-        this.canceled = true;
-    }
-
-    public void setError(final String error) {
-        writeLock.lock();
-        try {
-            this.error = error;
-            numCompletedSteps++;
-
-            updateExpiration();
-            if (numCompletedSteps >= numSteps) {
-                final long searchNanos = System.nanoTime() - creationNanos;
-                queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) {
-        writeLock.lock();
-        try {
-            this.matchingRecords.addAll(matchingRecords);
-            this.totalHitCount += totalHits;
-
-            numCompletedSteps++;
-            updateExpiration();
-
-            if (numCompletedSteps >= numSteps) {
-                final long searchNanos = System.nanoTime() - creationNanos;
-                queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    /**
-     * Must be called with write lock!
-     */
-    private void updateExpiration() {
-        expirationDate = new Date(System.currentTimeMillis() + TTL);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
deleted file mode 100644
index 0aaf5ef..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lineage;
-
-import static java.util.Objects.requireNonNull;
-
-public class EdgeNode implements LineageEdge {
-
-    private final String uuid;
-    private final LineageNode source;
-    private final LineageNode destination;
-
-    public EdgeNode(final String uuid, final LineageNode source, final LineageNode destination) {
-        this.uuid = uuid;
-        this.source = requireNonNull(source);
-        this.destination = requireNonNull(destination);
-    }
-
-    @Override
-    public String getUuid() {
-        return uuid;
-    }
-
-    @Override
-    public LineageNode getSource() {
-        return source;
-    }
-
-    @Override
-    public LineageNode getDestination() {
-        return destination;
-    }
-
-    @Override
-    public int hashCode() {
-        return 43298293 + source.hashCode() + destination.hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-
-        if (!(obj instanceof EdgeNode)) {
-            return false;
-        }
-
-        final EdgeNode other = (EdgeNode) obj;
-        return (source.equals(other.source) && destination.equals(other.destination));
-    }
-
-    @Override
-    public String toString() {
-        return "Edge[Source=" + source + ", Destination=" + destination + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
deleted file mode 100644
index 12d9a4f..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lineage;
-
-import java.util.List;
-
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
-
-public class EventNode implements ProvenanceEventLineageNode {
-
-    private final ProvenanceEventRecord record;
-    private String clusterNodeIdentifier = null;
-
-    public EventNode(final ProvenanceEventRecord event) {
-        this.record = event;
-    }
-
-    @Override
-    public String getIdentifier() {
-        return String.valueOf(getEventIdentifier());
-    }
-
-    @Override
-    public String getClusterNodeIdentifier() {
-        return clusterNodeIdentifier;
-    }
-
-    public void setClusterNodeIdentifier(final String nodeIdentifier) {
-        this.clusterNodeIdentifier = nodeIdentifier;
-    }
-
-    @Override
-    public LineageNodeType getNodeType() {
-        return LineageNodeType.PROVENANCE_EVENT_NODE;
-    }
-
-    @Override
-    public ProvenanceEventType getEventType() {
-        return record.getEventType();
-    }
-
-    @Override
-    public long getTimestamp() {
-        return record.getEventTime();
-    }
-
-    @Override
-    public long getEventIdentifier() {
-        return record.getEventId();
-    }
-
-    @Override
-    public String getFlowFileUuid() {
-        return record.getAttributes().get(CoreAttributes.UUID.key());
-    }
-
-    @Override
-    public List<String> getParentUuids() {
-        return record.getParentUuids();
-    }
-
-    @Override
-    public List<String> getChildUuids() {
-        return record.getChildUuids();
-    }
-
-    @Override
-    public int hashCode() {
-        return 2938472 + record.hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (this == obj) {
-            return true;
-        }
-
-        if (!(obj instanceof EventNode)) {
-            return false;
-        }
-
-        final EventNode other = (EventNode) obj;
-        return record.equals(other.record);
-    }
-
-    @Override
-    public String toString() {
-        return "Event[ID=" + record.getEventId() + ", Type=" + record.getEventType() + ", UUID=" + record.getFlowFileUuid() + ", Component=" + record.getComponentId() + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
deleted file mode 100644
index c36c38d..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lineage;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-public class FlowFileLineage implements Lineage {
-
-    private final List<LineageNode> nodes;
-    private final List<LineageEdge> edges;
-
-    public FlowFileLineage(final Collection<LineageNode> nodes, final Collection<LineageEdge> edges) {
-        this.nodes = new ArrayList<>(requireNonNull(nodes));
-        this.edges = new ArrayList<>(requireNonNull(edges));
-    }
-
-    @Override
-    public List<LineageNode> getNodes() {
-        return nodes;
-    }
-
-    @Override
-    public List<LineageEdge> getEdges() {
-        return edges;
-    }
-
-    @Override
-    public int hashCode() {
-        int sum = 923;
-        for (final LineageNode node : nodes) {
-            sum += node.hashCode();
-        }
-
-        for (final LineageEdge edge : edges) {
-            sum += edge.hashCode();
-        }
-
-        return sum;
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-
-        if (obj == this) {
-            return true;
-        }
-
-        if (!(obj instanceof FlowFileLineage)) {
-            return false;
-        }
-
-        final FlowFileLineage other = (FlowFileLineage) obj;
-        return nodes.equals(other.nodes) && edges.equals(other.edges);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
deleted file mode 100644
index fdc7470..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lineage;
-
-import static java.util.Objects.requireNonNull;
-
-public class FlowFileNode implements LineageNode {
-
-    private final String flowFileUuid;
-    private final long creationTime;
-    private String clusterNodeIdentifier;
-
-    public FlowFileNode(final String flowFileUuid, final long flowFileCreationTime) {
-        this.flowFileUuid = requireNonNull(flowFileUuid);
-        this.creationTime = flowFileCreationTime;
-    }
-
-    @Override
-    public String getIdentifier() {
-        return flowFileUuid;
-    }
-
-    @Override
-    public long getTimestamp() {
-        return creationTime;
-    }
-
-    @Override
-    public String getClusterNodeIdentifier() {
-        return clusterNodeIdentifier;
-    }
-
-    @Override
-    public LineageNodeType getNodeType() {
-        return LineageNodeType.FLOWFILE_NODE;
-    }
-
-    @Override
-    public String getFlowFileUuid() {
-        return flowFileUuid;
-    }
-
-    @Override
-    public int hashCode() {
-        return 23498723 + flowFileUuid.hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (obj == this) {
-            return true;
-        }
-
-        if (!(obj instanceof FlowFileNode)) {
-            return false;
-        }
-
-        final FlowFileNode other = (FlowFileNode) obj;
-        return flowFileUuid.equals(other.flowFileUuid);
-    }
-
-    @Override
-    public String toString() {
-        return "FlowFile[UUID=" + flowFileUuid + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/pom.xml b/nifi/commons/flowfile-packager/pom.xml
deleted file mode 100644
index 6e8d58d..0000000
--- a/nifi/commons/flowfile-packager/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <!--
-      Licensed to the Apache Software Foundation (ASF) under one or more
-      contributor license agreements.  See the NOTICE file distributed with
-      this work for additional information regarding copyright ownership.
-      The ASF licenses this file to You under the Apache License, Version 2.0
-      (the "License"); you may not use this file except in compliance with
-      the License.  You may obtain a copy of the License at
-          http://www.apache.org/licenses/LICENSE-2.0
-      Unless required by applicable law or agreed to in writing, software
-      distributed under the License is distributed on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      See the License for the specific language governing permissions and
-      limitations under the License.
-    -->
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-commons-parent</artifactId>
-        <version>0.0.1-incubating-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>flowfile-packager</artifactId>
-    <version>0.0.1-incubating-SNAPSHOT</version>
-    <packaging>jar</packaging>
-
-    <name>FlowFile Packager</name>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-        </dependency>
-    </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
deleted file mode 100644
index ae16f99..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-public interface FlowFilePackager {
-
-    void packageFlowFile(InputStream in, OutputStream out, Map<String, String> attributes, long fileSize) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
deleted file mode 100644
index 07baab1..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.lang3.StringEscapeUtils;
-
-public class FlowFilePackagerV1 implements FlowFilePackager {
-
-    public static final String FILENAME_ATTRIBUTES = "flowfile.attributes";
-    public static final String FILENAME_CONTENT = "flowfile.content";
-    public static final int DEFAULT_TAR_PERMISSIONS = 0644;
-
-    private final int tarPermissions;
-
-    public FlowFilePackagerV1() {
-        this(DEFAULT_TAR_PERMISSIONS);
-    }
-
-    public FlowFilePackagerV1(final int tarPermissions) {
-        this.tarPermissions = tarPermissions;
-    }
-
-    @Override
-    public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException {
-        try (final TarArchiveOutputStream tout = new TarArchiveOutputStream(out)) {
-            writeAttributesEntry(attributes, tout);
-            writeContentEntry(tout, in, fileSize);
-            tout.finish();
-            tout.flush();
-            tout.close();
-        }
-    }
-
-    private void writeAttributesEntry(final Map<String, String> attributes, final TarArchiveOutputStream tout) throws IOException {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE properties\n  SYSTEM \"http://java.sun.com/dtd/properties.dtd\">\n");
-        sb.append("<properties>");
-        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-            final String escapedKey = StringEscapeUtils.escapeXml11(entry.getKey());
-            final String escapedValue = StringEscapeUtils.escapeXml11(entry.getValue());
-            sb.append("\n  <entry key=\"").append(escapedKey).append("\">").append(escapedValue).append("</entry>");
-        }
-        sb.append("</properties>");
-
-        final byte[] metaBytes = sb.toString().getBytes(StandardCharsets.UTF_8);
-        final TarArchiveEntry attribEntry = new TarArchiveEntry(FILENAME_ATTRIBUTES);
-        attribEntry.setMode(tarPermissions);
-        attribEntry.setSize(metaBytes.length);
-        tout.putArchiveEntry(attribEntry);
-        tout.write(metaBytes);
-        tout.closeArchiveEntry();
-    }
-
-    private void writeContentEntry(final TarArchiveOutputStream tarOut, final InputStream inStream, final long fileSize) throws IOException {
-        final TarArchiveEntry entry = new TarArchiveEntry(FILENAME_CONTENT);
-        entry.setMode(tarPermissions);
-        entry.setSize(fileSize);
-        tarOut.putArchiveEntry(entry);
-        final byte[] buffer = new byte[512 << 10];//512KB            
-        int bytesRead = 0;
-        while ((bytesRead = inStream.read(buffer)) != -1) { //still more data to read
-            if (bytesRead > 0) {
-                tarOut.write(buffer, 0, bytesRead);
-            }
-        }
-
-        copy(inStream, tarOut);
-        tarOut.closeArchiveEntry();
-    }
-
-    public static long copy(final InputStream source, final OutputStream destination) throws IOException {
-        final byte[] buffer = new byte[8192];
-        int len;
-        long totalCount = 0L;
-        while ((len = source.read(buffer)) > 0) {
-            destination.write(buffer, 0, len);
-            totalCount += len;
-        }
-        return totalCount;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
deleted file mode 100644
index 6f9d6b1..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * <p>
- * Packages a FlowFile, including both its content and its attributes into a
- * single file that is stream-friendly. The encoding scheme is as such:
- * </p>
- *
- * <pre>
- * Length Field : indicates the number of Flow File Attributes in the stream
- * 1 to N times (N=number of Flow File Attributes):
- *      String Field : Flow File Attribute key name
- *      String Field : Flow File Attribute value
- * Long : 8 bytes indicating the length of the Flow File content
- * Content : The next M bytes are the content of the Flow File.
- * </pre>
- *
- * <pre>
- * Encoding of String Field is as follows:
- *      Length Field : indicates the length of the String
- *      1 to N bytes (N=String length, determined by previous field, as described above) : The UTF-8 encoded string value.
- * </pre>
- *
- * <pre>
- * Encoding of Length Field is as follows:
- *      First 2 bytes: Indicate length. If both bytes = 255, this is a special value indicating that the length is
- *                     greater than or equal to 65536 bytes; therefore, the next 4 bytes will indicate the actual length.
- * </pre>
- *
- * <p>
- * Note: All byte-order encoding is Network Byte Order (Most Significant Byte
- * first)
- * </p>
- *
- * <p>
- * The following example shows the bytes expected if we were to encode a
- * FlowFile containing the following attributes where the content is the text
- * "Hello World!":
- *
- * <br><br>
- * Attributes:
- * <pre>
- * +-------+-------+
- * | Key   + Value |
- * + --------------+
- * | A     | a     |
- * + --------------+
- * | B     | b     |
- * + --------------+
- * </pre> Content:<br>
- * Hello World!
- * <br><br>
- * Packaged Byte Encoding (In Hexadecimal Form):
- * <p>
- *
- * <pre>
- * 00 02 00 01 41 00 01 61
- * 00 01 42 00 01 62 00 00
- * 00 00 00 00 00 0C 48 65
- * 6C 6C 6F 20 57 6F 72 6C
- * 64 21
- * </pre>
- */
-public class FlowFilePackagerV2 implements FlowFilePackager {
-
-    private static final int MAX_VALUE_2_BYTES = 65535;
-    private final byte[] writeBuffer = new byte[8];
-
-    @Override
-    public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException {
-        writeFieldLength(out, attributes.size()); //write out the number of attributes
-        for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair
-            writeString(entry.getKey(), out);
-            writeString(entry.getValue(), out);
-        }
-        writeLong(out, fileSize);//write out length of data
-        copy(in, out);//write out the actual flow file payload
-    }
-
-    private void copy(final InputStream in, final OutputStream out) throws IOException {
-        final byte[] buffer = new byte[65536];
-        int len;
-        while ((len = in.read(buffer)) > 0) {
-            out.write(buffer, 0, len);
-        }
-    }
-
-    private void writeString(final String val, final OutputStream out) throws IOException {
-        final byte[] bytes = val.getBytes("UTF-8");
-        writeFieldLength(out, bytes.length);
-        out.write(bytes);
-    }
-
-    private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException {
-        // If the value is less than the max value that can be fit into 2 bytes, just use the
-        // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next
-        // 4 bytes to indicate the real length.
-        if (numBytes < MAX_VALUE_2_BYTES) {
-            writeBuffer[0] = (byte) (numBytes >>> 8);
-            writeBuffer[1] = (byte) (numBytes);
-            out.write(writeBuffer, 0, 2);
-        } else {
-            writeBuffer[0] = (byte) 0xff;
-            writeBuffer[1] = (byte) 0xff;
-            writeBuffer[2] = (byte) (numBytes >>> 24);
-            writeBuffer[3] = (byte) (numBytes >>> 16);
-            writeBuffer[4] = (byte) (numBytes >>> 8);
-            writeBuffer[5] = (byte) (numBytes);
-            out.write(writeBuffer, 0, 6);
-        }
-    }
-
-    private void writeLong(final OutputStream out, final long val) throws IOException {
-        writeBuffer[0] = (byte) (val >>> 56);
-        writeBuffer[1] = (byte) (val >>> 48);
-        writeBuffer[2] = (byte) (val >>> 40);
-        writeBuffer[3] = (byte) (val >>> 32);
-        writeBuffer[4] = (byte) (val >>> 24);
-        writeBuffer[5] = (byte) (val >>> 16);
-        writeBuffer[6] = (byte) (val >>> 8);
-        writeBuffer[7] = (byte) (val);
-        out.write(writeBuffer, 0, 8);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
deleted file mode 100644
index 181f3e3..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-public class FlowFilePackagerV3 implements FlowFilePackager {
-
-    public static final byte[] MAGIC_HEADER = {'N', 'i', 'F', 'i', 'F', 'F', '3'};
-    private static final int MAX_VALUE_2_BYTES = 65535;
-    private final byte[] writeBuffer = new byte[8];
-
-    @Override
-    public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException {
-        out.write(MAGIC_HEADER);
-
-        if (attributes == null) {
-            writeFieldLength(out, 0);
-        } else {
-            writeFieldLength(out, attributes.size()); //write out the number of attributes
-            for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair
-                writeString(entry.getKey(), out);
-                writeString(entry.getValue(), out);
-            }
-        }
-
-        writeLong(out, fileSize);//write out length of data
-        copy(in, out);//write out the actual flow file payload
-    }
-
-    private void copy(final InputStream in, final OutputStream out) throws IOException {
-        final byte[] buffer = new byte[65536];
-        int len;
-        while ((len = in.read(buffer)) > 0) {
-            out.write(buffer, 0, len);
-        }
-    }
-
-    private void writeString(final String val, final OutputStream out) throws IOException {
-        final byte[] bytes = val.getBytes("UTF-8");
-        writeFieldLength(out, bytes.length);
-        out.write(bytes);
-    }
-
-    private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException {
-        // If the value is less than the max value that can be fit into 2 bytes, just use the
-        // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next
-        // 4 bytes to indicate the real length.
-        if (numBytes < MAX_VALUE_2_BYTES) {
-            writeBuffer[0] = (byte) (numBytes >>> 8);
-            writeBuffer[1] = (byte) (numBytes);
-            out.write(writeBuffer, 0, 2);
-        } else {
-            writeBuffer[0] = (byte) 0xff;
-            writeBuffer[1] = (byte) 0xff;
-            writeBuffer[2] = (byte) (numBytes >>> 24);
-            writeBuffer[3] = (byte) (numBytes >>> 16);
-            writeBuffer[4] = (byte) (numBytes >>> 8);
-            writeBuffer[5] = (byte) (numBytes);
-            out.write(writeBuffer, 0, 6);
-        }
-    }
-
-    private void writeLong(final OutputStream out, final long val) throws IOException {
-        writeBuffer[0] = (byte) (val >>> 56);
-        writeBuffer[1] = (byte) (val >>> 48);
-        writeBuffer[2] = (byte) (val >>> 40);
-        writeBuffer[3] = (byte) (val >>> 32);
-        writeBuffer[4] = (byte) (val >>> 24);
-        writeBuffer[5] = (byte) (val >>> 16);
-        writeBuffer[6] = (byte) (val >>> 8);
-        writeBuffer[7] = (byte) (val);
-        out.write(writeBuffer, 0, 8);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
deleted file mode 100644
index fd9d92d..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-public interface FlowFileUnpackager {
-
-    Map<String, String> unpackageFlowFile(InputStream in, OutputStream out) throws IOException;
-
-    boolean hasMoreData() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
deleted file mode 100644
index f8ef3d1..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-
-public class FlowFileUnpackagerV1 implements FlowFileUnpackager {
-
-    private int flowFilesRead = 0;
-
-    @Override
-    public Map<String, String> unpackageFlowFile(final InputStream in, final OutputStream out) throws IOException {
-        flowFilesRead++;
-        final TarArchiveInputStream tarIn = new TarArchiveInputStream(in);
-        final TarArchiveEntry attribEntry = tarIn.getNextTarEntry();
-        if (attribEntry == null) {
-            return null;
-        }
-
-        final Map<String, String> attributes;
-        if (attribEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) {
-            attributes = getAttributes(tarIn);
-        } else {
-            throw new IOException("Expected two tar entries: "
-                    + FlowFilePackagerV1.FILENAME_CONTENT + " and "
-                    + FlowFilePackagerV1.FILENAME_ATTRIBUTES);
-        }
-
-        final TarArchiveEntry contentEntry = tarIn.getNextTarEntry();
-
-        if (contentEntry != null && contentEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) {
-            final byte[] buffer = new byte[512 << 10];//512KB            
-            int bytesRead = 0;
-            while ((bytesRead = tarIn.read(buffer)) != -1) { //still more data to read
-                if (bytesRead > 0) {
-                    out.write(buffer, 0, bytesRead);
-                }
-            }
-            out.flush();
-        } else {
-            throw new IOException("Expected two tar entries: "
-                    + FlowFilePackagerV1.FILENAME_CONTENT + " and "
-                    + FlowFilePackagerV1.FILENAME_ATTRIBUTES);
-        }
-
-        return attributes;
-    }
-
-    protected Map<String, String> getAttributes(final TarArchiveInputStream stream) throws IOException {
-
-        final Properties props = new Properties();
-        props.loadFromXML(new NonCloseableInputStream(stream));
-
-        final Map<String, String> result = new HashMap<>();
-        for (final Entry<Object, Object> entry : props.entrySet()) {
-            final Object keyObject = entry.getKey();
-            final Object valueObject = entry.getValue();
-            if (!(keyObject instanceof String)) {
-                throw new IOException("Flow file attributes object contains key of type "
-                        + keyObject.getClass().getCanonicalName()
-                        + " but expected java.lang.String");
-            } else if (!(keyObject instanceof String)) {
-                throw new IOException("Flow file attributes object contains value of type "
-                        + keyObject.getClass().getCanonicalName()
-                        + " but expected java.lang.String");
-            }
-
-            final String key = (String) keyObject;
-            final String value = (String) valueObject;
-            result.put(key, value);
-        }
-
-        return result;
-    }
-
-    @Override
-    public boolean hasMoreData() throws IOException {
-        return flowFilesRead == 0;
-    }
-
-    public static final class NonCloseableInputStream extends InputStream {
-
-        final InputStream stream;
-
-        public NonCloseableInputStream(final InputStream stream) {
-            this.stream = stream;
-        }
-
-        @Override
-        public void close() {
-        }
-
-        @Override
-        public int read() throws IOException {
-            return stream.read();
-        }
-
-        @Override
-        public int available() throws IOException {
-            return stream.available();
-        }
-
-        @Override
-        public synchronized void mark(int readlimit) {
-            stream.mark(readlimit);
-        }
-
-        @Override
-        public synchronized void reset() throws IOException {
-            stream.reset();
-        }
-
-        @Override
-        public boolean markSupported() {
-            return stream.markSupported();
-        }
-
-        @Override
-        public long skip(long n) throws IOException {
-            return stream.skip(n);
-        }
-
-        @Override
-        public int read(byte b[], int off, int len) throws IOException {
-            return stream.read(b, off, len);
-        }
-
-        @Override
-        public int read(byte b[]) throws IOException {
-            return stream.read(b);
-        }
-    }
-}