You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/21 07:48:56 UTC
[48/51] [partial] incubator-nifi git commit: NIFI-270 Made all
changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
deleted file mode 100644
index afb56e8..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.nifi.provenance.lineage.ComputeLineageResult;
-import org.apache.nifi.provenance.lineage.EdgeNode;
-import org.apache.nifi.provenance.lineage.EventNode;
-import org.apache.nifi.provenance.lineage.FlowFileNode;
-import org.apache.nifi.provenance.lineage.LineageEdge;
-import org.apache.nifi.provenance.lineage.LineageNode;
-
-/**
- *
- */
-public class StandardLineageResult implements ComputeLineageResult {
-
- public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES);
- private static final Logger logger = LoggerFactory.getLogger(StandardLineageResult.class);
-
- private final Collection<String> flowFileUuids;
- private final Collection<ProvenanceEventRecord> relevantRecords = new ArrayList<>();
- private final Set<LineageNode> nodes = new HashSet<>();
- private final Set<LineageEdge> edges = new HashSet<>();
- private final int numSteps;
- private final long creationNanos;
- private long computationNanos;
-
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
- private final Lock writeLock = rwLock.writeLock();
-
- private Date expirationDate = null;
- private String error = null;
- private int numCompletedSteps = 0;
-
- private volatile boolean canceled = false;
-
- public StandardLineageResult(final int numSteps, final Collection<String> flowFileUuids) {
- this.numSteps = numSteps;
- this.creationNanos = System.nanoTime();
- this.flowFileUuids = flowFileUuids;
-
- updateExpiration();
- }
-
- @Override
- public List<LineageNode> getNodes() {
- readLock.lock();
- try {
- return new ArrayList<>(nodes);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public List<LineageEdge> getEdges() {
- readLock.lock();
- try {
- return new ArrayList<>(edges);
- } finally {
- readLock.unlock();
- }
- }
-
- public int getNumberOfEdges() {
- readLock.lock();
- try {
- return edges.size();
- } finally {
- readLock.unlock();
- }
- }
-
- public int getNumberOfNodes() {
- readLock.lock();
- try {
- return nodes.size();
- } finally {
- readLock.unlock();
- }
- }
-
- public long getComputationTime(final TimeUnit timeUnit) {
- readLock.lock();
- try {
- return timeUnit.convert(computationNanos, TimeUnit.NANOSECONDS);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public Date getExpiration() {
- readLock.lock();
- try {
- return expirationDate;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public String getError() {
- readLock.lock();
- try {
- return error;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public int getPercentComplete() {
- readLock.lock();
- try {
- return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public boolean isFinished() {
- readLock.lock();
- try {
- return numCompletedSteps >= numSteps || canceled;
- } finally {
- readLock.unlock();
- }
- }
-
- public void setError(final String error) {
- writeLock.lock();
- try {
- this.error = error;
- numCompletedSteps++;
-
- updateExpiration();
-
- if (numCompletedSteps >= numSteps) {
- computationNanos = System.nanoTime() - creationNanos;
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public void update(final Collection<ProvenanceEventRecord> records) {
- writeLock.lock();
- try {
- relevantRecords.addAll(records);
-
- numCompletedSteps++;
- updateExpiration();
-
- if (numCompletedSteps >= numSteps && error == null) {
- computeLineage();
- computationNanos = System.nanoTime() - creationNanos;
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Computes the lineage from the relevant Provenance Event Records. This
- * method must be called with the write lock held and is only going to be
- * useful after all of the records have been successfully obtained
- */
- private void computeLineage() {
- final long startNanos = System.nanoTime();
-
- nodes.clear();
- edges.clear();
-
- Map<String, LineageNode> lastEventMap = new HashMap<>(); // maps FlowFile UUID to last event for that FlowFile
- final List<ProvenanceEventRecord> sortedRecords = new ArrayList<>(relevantRecords);
- Collections.sort(sortedRecords, new Comparator<ProvenanceEventRecord>() {
- @Override
- public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) {
- // Sort on Event Time, then Event ID.
- final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime());
- if (eventTimeComparison == 0) {
- return Long.compare(o1.getEventId(), o2.getEventId());
- } else {
- return eventTimeComparison;
- }
- }
- });
-
- // convert the StandardProvenanceRecord objects into Lineage nodes (FlowFileNode, EventNodes).
- for (final ProvenanceEventRecord record : sortedRecords) {
- final LineageNode lineageNode = new EventNode(record);
- final boolean added = nodes.add(lineageNode);
- if (!added) {
- logger.debug("Did not add {} because it already exists in the 'nodes' set", lineageNode);
- }
-
- // Create an edge that connects this node to the previous node for the same FlowFile UUID.
- final LineageNode lastNode = lastEventMap.get(record.getFlowFileUuid());
- if (lastNode != null) {
- // We calculate the Edge UUID based on whether or not this event is a SPAWN.
- // If this event is a SPAWN, then we want to use the previous node's UUID because a
- // SPAWN Event's UUID is not necessarily what we want, since a SPAWN Event's UUID pertains to
- // only one of (potentially) many UUIDs associated with the event. Otherwise, we know that
- // the UUID of this record is appropriate, so we just use it.
- final String edgeUuid;
-
- switch (record.getEventType()) {
- case JOIN:
- case CLONE:
- case REPLAY:
- edgeUuid = lastNode.getFlowFileUuid();
- break;
- default:
- edgeUuid = record.getFlowFileUuid();
- break;
- }
-
- edges.add(new EdgeNode(edgeUuid, lastNode, lineageNode));
- }
-
- lastEventMap.put(record.getFlowFileUuid(), lineageNode);
-
- switch (record.getEventType()) {
- case FORK:
- case JOIN:
- case REPLAY:
- case CLONE: {
- // For events that create FlowFile nodes, we need to create the FlowFile Nodes and associated Edges, as appropriate
- for (final String childUuid : record.getChildUuids()) {
- if (flowFileUuids.contains(childUuid)) {
- final FlowFileNode childNode = new FlowFileNode(childUuid, record.getEventTime());
- final boolean isNewFlowFile = nodes.add(childNode);
- if (!isNewFlowFile) {
- final String msg = "Unable to generate Lineage Graph because multiple events were registered claiming to have generated the same FlowFile (UUID = " + childNode.getFlowFileUuid() + ")";
- logger.error(msg);
- setError(msg);
- return;
- }
-
- edges.add(new EdgeNode(childNode.getFlowFileUuid(), lineageNode, childNode));
- lastEventMap.put(childUuid, childNode);
- }
- }
- for (final String parentUuid : record.getParentUuids()) {
- LineageNode lastNodeForParent = lastEventMap.get(parentUuid);
- if (lastNodeForParent != null && !lastNodeForParent.equals(lineageNode)) {
- edges.add(new EdgeNode(parentUuid, lastNodeForParent, lineageNode));
- }
-
- lastEventMap.put(parentUuid, lineageNode);
- }
- }
- break;
- case RECEIVE:
- case CREATE: {
- // for a receive event, we want to create a FlowFile Node that represents the FlowFile received
- // and create an edge from the Receive Event to the FlowFile Node
- final LineageNode flowFileNode = new FlowFileNode(record.getFlowFileUuid(), record.getEventTime());
- final boolean isNewFlowFile = nodes.add(flowFileNode);
- if (!isNewFlowFile) {
- final String msg = "Found cycle in graph. This indicates that multiple events were registered claiming to have generated the same FlowFile (UUID = " + flowFileNode.getFlowFileUuid() + ")";
- setError(msg);
- logger.error(msg);
- return;
- }
- edges.add(new EdgeNode(record.getFlowFileUuid(), lineageNode, flowFileNode));
- lastEventMap.put(record.getFlowFileUuid(), flowFileNode);
- }
- break;
- default:
- break;
- }
- }
-
- final long nanos = System.nanoTime() - startNanos;
- logger.debug("Finished building lineage with {} nodes and {} edges in {} millis", nodes.size(), edges.size(), TimeUnit.NANOSECONDS.toMillis(nanos));
- }
-
- void cancel() {
- this.canceled = true;
- }
-
- /**
- * Must be called with write lock!
- */
- private void updateExpiration() {
- expirationDate = new Date(System.currentTimeMillis() + TTL);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
deleted file mode 100644
index cfbae88..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.Relationship;
-
-/**
- * Holder for provenance relevant information
- * <p/>
- * @author none
- */
-public final class StandardProvenanceEventRecord implements ProvenanceEventRecord {
-
- private final long eventTime;
- private final long entryDate;
- private final ProvenanceEventType eventType;
- private final long lineageStartDate;
- private final Set<String> lineageIdentifiers;
- private final String componentId;
- private final String componentType;
- private final String transitUri;
- private final String sourceSystemFlowFileIdentifier;
- private final String uuid;
- private final List<String> parentUuids;
- private final List<String> childrenUuids;
- private final String alternateIdentifierUri;
- private final String details;
- private final String relationship;
- private final long storageByteOffset;
- private final String storageFilename;
- private final long eventDuration;
-
- private final String contentClaimSection;
- private final String contentClaimContainer;
- private final String contentClaimIdentifier;
- private final Long contentClaimOffset;
- private final long contentSize;
-
- private final String previousClaimSection;
- private final String previousClaimContainer;
- private final String previousClaimIdentifier;
- private final Long previousClaimOffset;
- private final Long previousSize;
-
- private final String sourceQueueIdentifier;
-
- private final Map<String, String> previousAttributes;
- private final Map<String, String> updatedAttributes;
-
- private volatile long eventId;
-
- private StandardProvenanceEventRecord(final Builder builder) {
- this.eventTime = builder.eventTime;
- this.entryDate = builder.entryDate;
- this.eventType = builder.eventType;
- this.componentId = builder.componentId;
- this.componentType = builder.componentType;
- this.transitUri = builder.transitUri;
- this.sourceSystemFlowFileIdentifier = builder.sourceSystemFlowFileIdentifier;
- this.uuid = builder.uuid;
- this.parentUuids = builder.parentUuids;
- this.childrenUuids = builder.childrenUuids;
- this.alternateIdentifierUri = builder.alternateIdentifierUri;
- this.details = builder.details;
- this.relationship = builder.relationship;
- this.storageByteOffset = builder.storageByteOffset;
- this.storageFilename = builder.storageFilename;
- this.eventDuration = builder.eventDuration;
- this.lineageStartDate = builder.lineageStartDate;
- this.lineageIdentifiers = Collections.unmodifiableSet(builder.lineageIdentifiers);
-
- previousClaimSection = builder.previousClaimSection;
- previousClaimContainer = builder.previousClaimContainer;
- previousClaimIdentifier = builder.previousClaimIdentifier;
- previousClaimOffset = builder.previousClaimOffset;
- previousSize = builder.previousSize;
-
- contentClaimSection = builder.contentClaimSection;
- contentClaimContainer = builder.contentClaimContainer;
- contentClaimIdentifier = builder.contentClaimIdentifier;
- contentClaimOffset = builder.contentClaimOffset;
- contentSize = builder.contentSize;
-
- previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes);
- updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
-
- sourceQueueIdentifier = builder.sourceQueueIdentifier;
-
- }
-
- public String getStorageFilename() {
- return storageFilename;
- }
-
- public long getStorageByteOffset() {
- return storageByteOffset;
- }
-
- void setEventId(final long eventId) {
- this.eventId = eventId;
- }
-
- @Override
- public long getEventId() {
- return eventId;
- }
-
- @Override
- public long getEventTime() {
- return eventTime;
- }
-
- @Override
- public Set<String> getLineageIdentifiers() {
- return lineageIdentifiers;
- }
-
- @Override
- public long getLineageStartDate() {
- return lineageStartDate;
- }
-
- @Override
- public long getFileSize() {
- return contentSize;
- }
-
- @Override
- public Long getPreviousFileSize() {
- return previousSize;
- }
-
- @Override
- public ProvenanceEventType getEventType() {
- return eventType;
- }
-
- @Override
- public Map<String, String> getAttributes() {
- final Map<String, String> allAttrs = new HashMap<>(previousAttributes.size() + updatedAttributes.size());
- allAttrs.putAll(previousAttributes);
- for (final Map.Entry<String, String> entry : updatedAttributes.entrySet()) {
- if (entry.getValue() != null) {
- allAttrs.put(entry.getKey(), entry.getValue());
- }
- }
- return allAttrs;
- }
-
- @Override
- public String getComponentId() {
- return componentId;
- }
-
- @Override
- public String getComponentType() {
- return componentType;
- }
-
- @Override
- public String getTransitUri() {
- return transitUri;
- }
-
- @Override
- public String getSourceSystemFlowFileIdentifier() {
- return sourceSystemFlowFileIdentifier;
- }
-
- @Override
- public String getFlowFileUuid() {
- return uuid;
- }
-
- @Override
- public List<String> getParentUuids() {
- return parentUuids == null ? Collections.<String>emptyList() : parentUuids;
- }
-
- @Override
- public List<String> getChildUuids() {
- return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids;
- }
-
- @Override
- public String getAlternateIdentifierUri() {
- return alternateIdentifierUri;
- }
-
- @Override
- public long getEventDuration() {
- return eventDuration;
- }
-
- @Override
- public String getDetails() {
- return details;
- }
-
- @Override
- public String getRelationship() {
- return relationship;
- }
-
- @Override
- public long getFlowFileEntryDate() {
- return entryDate;
- }
-
- @Override
- public String getContentClaimSection() {
- return contentClaimSection;
- }
-
- @Override
- public String getContentClaimContainer() {
- return contentClaimContainer;
- }
-
- @Override
- public String getContentClaimIdentifier() {
- return contentClaimIdentifier;
- }
-
- @Override
- public Long getContentClaimOffset() {
- return contentClaimOffset;
- }
-
- @Override
- public String getSourceQueueIdentifier() {
- return sourceQueueIdentifier;
- }
-
- @Override
- public Map<String, String> getPreviousAttributes() {
- return previousAttributes;
- }
-
- @Override
- public String getPreviousContentClaimContainer() {
- return previousClaimContainer;
- }
-
- @Override
- public String getPreviousContentClaimIdentifier() {
- return previousClaimIdentifier;
- }
-
- @Override
- public Long getPreviousContentClaimOffset() {
- return previousClaimOffset;
- }
-
- @Override
- public String getPreviousContentClaimSection() {
- return previousClaimSection;
- }
-
- @Override
- public Map<String, String> getUpdatedAttributes() {
- return updatedAttributes;
- }
-
- @Override
- public int hashCode() {
- final int eventTypeCode;
- if (eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.FORK) {
- eventTypeCode = 1472;
- } else if (eventType == ProvenanceEventType.REPLAY) {
- eventTypeCode = 21479 + (int) (0x7FFFFFFF & eventTime); // use lower bits of event time.
- } else {
- eventTypeCode = 4812 + eventType.hashCode() + 4 * uuid.hashCode();
- }
-
- return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
- + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof StandardProvenanceEventRecord)) {
- return false;
- }
-
- final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
- // If event ID's are populated and not equal, return false. If they have not yet been populated, do not
- // use them in the comparison.
- if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) {
- return false;
- }
- if (eventType != other.eventType) {
- return false;
- }
-
- if (!componentId.equals(other.componentId)) {
- return false;
- }
-
- if (different(parentUuids, other.parentUuids)) {
- return false;
- }
-
- if (different(childrenUuids, other.childrenUuids)) {
- return false;
- }
-
- // SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child.
- if (!uuid.equals(other.uuid)) {
- return false;
- }
-
- if (different(transitUri, other.transitUri)) {
- return false;
- }
-
- if (different(relationship, other.relationship)) {
- return false;
- }
-
- return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime());
- }
-
- private boolean different(final Object a, final Object b) {
- if (a == null && b == null) {
- return false;
- }
- if (a == null || b == null) {
- return true;
- }
-
- return !a.equals(b);
- }
-
- private boolean different(final List<String> a, final List<String> b) {
- if (a == null && b == null) {
- return false;
- }
-
- if (a == null && b != null) {
- return true;
- }
-
- if (a != null && b == null) {
- return true;
- }
-
- if (a.size() != b.size()) {
- return true;
- }
-
- final List<String> sortedA = new ArrayList<>(a);
- final List<String> sortedB = new ArrayList<>(b);
-
- Collections.sort(sortedA);
- Collections.sort(sortedB);
-
- for (int i = 0; i < sortedA.size(); i++) {
- if (!sortedA.get(i).equals(sortedB.get(i))) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- public String toString() {
- return "ProvenanceEventRecord ["
- + "eventId=" + eventId
- + ", eventType=" + eventType
- + ", eventTime=" + new Date(eventTime)
- + ", uuid=" + uuid
- + ", fileSize=" + contentSize
- + ", componentId=" + componentId
- + ", transitUri=" + transitUri
- + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
- + ", parentUuids=" + parentUuids
- + ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
- }
-
- public static class Builder implements ProvenanceEventBuilder {
-
- private long eventTime = System.currentTimeMillis();
- private long entryDate;
- private long lineageStartDate;
- private Set<String> lineageIdentifiers = new HashSet<>();
- private ProvenanceEventType eventType = null;
- private String componentId = null;
- private String componentType = null;
- private String sourceSystemFlowFileIdentifier = null;
- private String transitUri = null;
- private String uuid = null;
- private List<String> parentUuids = null;
- private List<String> childrenUuids = null;
- private String contentType = null;
- private String alternateIdentifierUri = null;
- private String details = null;
- private String relationship = null;
- private long storageByteOffset = -1L;
- private long eventDuration = -1L;
- private String storageFilename;
-
- private String contentClaimSection;
- private String contentClaimContainer;
- private String contentClaimIdentifier;
- private Long contentClaimOffset;
- private Long contentSize;
-
- private String previousClaimSection;
- private String previousClaimContainer;
- private String previousClaimIdentifier;
- private Long previousClaimOffset;
- private Long previousSize;
-
- private String sourceQueueIdentifier;
-
- private Map<String, String> previousAttributes;
- private Map<String, String> updatedAttributes;
-
- @Override
- public Builder fromEvent(final ProvenanceEventRecord event) {
- eventTime = event.getEventTime();
- entryDate = event.getFlowFileEntryDate();
- lineageStartDate = event.getLineageStartDate();
- lineageIdentifiers = event.getLineageIdentifiers();
- eventType = event.getEventType();
- componentId = event.getComponentId();
- componentType = event.getComponentType();
- transitUri = event.getTransitUri();
- sourceSystemFlowFileIdentifier = event.getSourceSystemFlowFileIdentifier();
- uuid = event.getFlowFileUuid();
- parentUuids = event.getParentUuids();
- childrenUuids = event.getChildUuids();
- alternateIdentifierUri = event.getAlternateIdentifierUri();
- eventDuration = event.getEventDuration();
- previousAttributes = event.getPreviousAttributes();
- updatedAttributes = event.getUpdatedAttributes();
- details = event.getDetails();
- relationship = event.getRelationship();
-
- contentClaimSection = event.getContentClaimSection();
- contentClaimContainer = event.getContentClaimContainer();
- contentClaimIdentifier = event.getContentClaimIdentifier();
- contentClaimOffset = event.getContentClaimOffset();
- contentSize = event.getFileSize();
-
- previousClaimSection = event.getPreviousContentClaimSection();
- previousClaimContainer = event.getPreviousContentClaimContainer();
- previousClaimIdentifier = event.getPreviousContentClaimIdentifier();
- previousClaimOffset = event.getPreviousContentClaimOffset();
- previousSize = event.getPreviousFileSize();
-
- sourceQueueIdentifier = event.getSourceQueueIdentifier();
-
- if (event instanceof StandardProvenanceEventRecord) {
- final StandardProvenanceEventRecord standardProvEvent = (StandardProvenanceEventRecord) event;
- storageByteOffset = standardProvEvent.storageByteOffset;
- storageFilename = standardProvEvent.storageFilename;
- }
-
- return this;
- }
-
- @Override
- public Builder setFlowFileEntryDate(final long entryDate) {
- this.entryDate = entryDate;
- return this;
- }
-
- @Override
- public Builder setLineageIdentifiers(final Set<String> lineageIdentifiers) {
- this.lineageIdentifiers = lineageIdentifiers;
- return this;
- }
-
- @Override
- public Builder setAttributes(final Map<String, String> previousAttributes, final Map<String, String> updatedAttributes) {
- this.previousAttributes = previousAttributes;
- this.updatedAttributes = updatedAttributes;
- return this;
- }
-
- @Override
- public Builder setFlowFileUUID(final String uuid) {
- this.uuid = uuid;
- return this;
- }
-
- public Builder setStorageLocation(final String filename, final long offset) {
- this.storageFilename = filename;
- this.storageByteOffset = offset;
- return this;
- }
-
- @Override
- public Builder setEventTime(long eventTime) {
- this.eventTime = eventTime;
- return this;
- }
-
- @Override
- public Builder setEventDuration(final long millis) {
- this.eventDuration = millis;
- return this;
- }
-
- @Override
- public Builder setLineageStartDate(final long startDate) {
- this.lineageStartDate = startDate;
- return this;
- }
-
- public Builder addLineageIdentifier(final String lineageIdentifier) {
- this.lineageIdentifiers.add(lineageIdentifier);
- return this;
- }
-
- @Override
- public Builder setEventType(ProvenanceEventType eventType) {
- this.eventType = eventType;
- return this;
- }
-
- @Override
- public Builder setComponentId(String componentId) {
- this.componentId = componentId;
- return this;
- }
-
- @Override
- public Builder setComponentType(String componentType) {
- this.componentType = componentType;
- return this;
- }
-
- @Override
- public Builder setSourceSystemFlowFileIdentifier(String sourceSystemFlowFileIdentifier) {
- this.sourceSystemFlowFileIdentifier = sourceSystemFlowFileIdentifier;
- return this;
- }
-
- @Override
- public Builder setTransitUri(String transitUri) {
- this.transitUri = transitUri;
- return this;
- }
-
- @Override
- public Builder addParentFlowFile(final FlowFile parentFlowFile) {
- if (this.parentUuids == null) {
- this.parentUuids = new ArrayList<>();
- }
- this.parentUuids.add(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
- return this;
- }
-
- public Builder addParentUuid(final String uuid) {
- if (this.parentUuids == null) {
- this.parentUuids = new ArrayList<>();
- }
- this.parentUuids.add(uuid);
- return this;
- }
-
- @Override
- public Builder removeParentFlowFile(final FlowFile parentFlowFile) {
- if (this.parentUuids == null) {
- return this;
- }
-
- parentUuids.remove(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
- return this;
- }
-
- @Override
- public Builder addChildFlowFile(final FlowFile childFlowFile) {
- if (this.childrenUuids == null) {
- this.childrenUuids = new ArrayList<>();
- }
- this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
- return this;
- }
-
- public Builder addChildUuid(final String uuid) {
- if (this.childrenUuids == null) {
- this.childrenUuids = new ArrayList<>();
- }
- this.childrenUuids.add(uuid);
- return this;
- }
-
- @Override
- public Builder removeChildFlowFile(final FlowFile childFlowFile) {
- if (this.childrenUuids == null) {
- return this;
- }
-
- childrenUuids.remove(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
- return this;
- }
-
- public Builder setContentType(String contentType) {
- this.contentType = contentType;
- return this;
- }
-
- @Override
- public Builder setAlternateIdentifierUri(String alternateIdentifierUri) {
- this.alternateIdentifierUri = alternateIdentifierUri;
- return this;
- }
-
- @Override
- public Builder setDetails(String details) {
- this.details = details;
- return this;
- }
-
- @Override
- public Builder setRelationship(Relationship relationship) {
- this.relationship = relationship.getName();
- return this;
- }
-
- public Builder setRelationship(final String relationship) {
- this.relationship = relationship;
- return this;
- }
-
- @Override
- public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) {
- setFlowFileEntryDate(flowFile.getEntryDate());
- setLineageIdentifiers(flowFile.getLineageIdentifiers());
- setLineageStartDate(flowFile.getLineageStartDate());
- setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes());
- uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
- this.contentSize = flowFile.getSize();
- return this;
- }
-
- @Override
- public Builder setPreviousContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) {
- previousClaimSection = section;
- previousClaimContainer = container;
- previousClaimIdentifier = identifier;
- previousClaimOffset = offset;
- previousSize = size;
- return this;
- }
-
- @Override
- public Builder setCurrentContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) {
- contentClaimSection = section;
- contentClaimContainer = container;
- contentClaimIdentifier = identifier;
- contentClaimOffset = offset;
- contentSize = size;
- return this;
- }
-
- @Override
- public Builder setSourceQueueIdentifier(final String identifier) {
- sourceQueueIdentifier = identifier;
- return this;
- }
-
- private void assertSet(final Object value, final String name) {
- if (value == null) {
- throw new IllegalStateException("Cannot create Provenance Event Record because " + name + " is not set");
- }
- }
-
- public ProvenanceEventType getEventType() {
- return eventType;
- }
-
- public List<String> getChildUuids() {
- return Collections.unmodifiableList(childrenUuids);
- }
-
- public List<String> getParentUuids() {
- return Collections.unmodifiableList(parentUuids);
- }
-
- @Override
- public StandardProvenanceEventRecord build() {
- assertSet(eventType, "Event Type");
- assertSet(componentId, "Component ID");
- assertSet(componentType, "Component Type");
- assertSet(uuid, "FlowFile UUID");
- assertSet(contentSize, "FlowFile Size");
-
- switch (eventType) {
- case ADDINFO:
- if (alternateIdentifierUri == null) {
- throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no alternate identifiers have been set");
- }
- break;
- case RECEIVE:
- case SEND:
- assertSet(transitUri, "Transit URI");
- break;
- case ROUTE:
- assertSet(relationship, "Relationship");
- break;
- case CLONE:
- case FORK:
- case JOIN:
- if ((parentUuids == null || parentUuids.isEmpty()) && (childrenUuids == null || childrenUuids.isEmpty())) {
- throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no Parent UUIDs or Children UUIDs have been set");
- }
- break;
- default:
- break;
- }
-
- return new StandardProvenanceEventRecord(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
deleted file mode 100644
index 9a9a27d..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.provenance.search.Query;
-import org.apache.nifi.provenance.search.QueryResult;
-
-public class StandardQueryResult implements QueryResult {
-
- public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES);
- private final Query query;
- private final long creationNanos;
-
- private final int numSteps;
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
-
- private final Lock writeLock = rwLock.writeLock();
- // guarded by writeLock
- private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>();
- private long totalHitCount;
- private int numCompletedSteps = 0;
- private Date expirationDate;
- private String error;
- private long queryTime;
-
- private volatile boolean canceled = false;
-
- public StandardQueryResult(final Query query, final int numSteps) {
- this.query = query;
- this.numSteps = numSteps;
- this.creationNanos = System.nanoTime();
-
- updateExpiration();
- }
-
- @Override
- public List<ProvenanceEventRecord> getMatchingEvents() {
- readLock.lock();
- try {
- if (matchingRecords.size() <= query.getMaxResults()) {
- return new ArrayList<>(matchingRecords);
- }
-
- final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
- for (int i = 0; i < query.getMaxResults(); i++) {
- copy.add(matchingRecords.get(i));
- }
-
- return copy;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public long getTotalHitCount() {
- readLock.lock();
- try {
- return totalHitCount;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public long getQueryTime() {
- return queryTime;
- }
-
- @Override
- public Date getExpiration() {
- return expirationDate;
- }
-
- @Override
- public String getError() {
- return error;
- }
-
- @Override
- public int getPercentComplete() {
- readLock.lock();
- try {
- return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public boolean isFinished() {
- readLock.lock();
- try {
- return numCompletedSteps >= numSteps || canceled;
- } finally {
- readLock.unlock();
- }
- }
-
- void cancel() {
- this.canceled = true;
- }
-
- public void setError(final String error) {
- writeLock.lock();
- try {
- this.error = error;
- numCompletedSteps++;
-
- updateExpiration();
- if (numCompletedSteps >= numSteps) {
- final long searchNanos = System.nanoTime() - creationNanos;
- queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) {
- writeLock.lock();
- try {
- this.matchingRecords.addAll(matchingRecords);
- this.totalHitCount += totalHits;
-
- numCompletedSteps++;
- updateExpiration();
-
- if (numCompletedSteps >= numSteps) {
- final long searchNanos = System.nanoTime() - creationNanos;
- queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Must be called with write lock!
- */
- private void updateExpiration() {
- expirationDate = new Date(System.currentTimeMillis() + TTL);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
deleted file mode 100644
index 0aaf5ef..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lineage;
-
-import static java.util.Objects.requireNonNull;
-
-public class EdgeNode implements LineageEdge {
-
- private final String uuid;
- private final LineageNode source;
- private final LineageNode destination;
-
- public EdgeNode(final String uuid, final LineageNode source, final LineageNode destination) {
- this.uuid = uuid;
- this.source = requireNonNull(source);
- this.destination = requireNonNull(destination);
- }
-
- @Override
- public String getUuid() {
- return uuid;
- }
-
- @Override
- public LineageNode getSource() {
- return source;
- }
-
- @Override
- public LineageNode getDestination() {
- return destination;
- }
-
- @Override
- public int hashCode() {
- return 43298293 + source.hashCode() + destination.hashCode();
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
-
- if (!(obj instanceof EdgeNode)) {
- return false;
- }
-
- final EdgeNode other = (EdgeNode) obj;
- return (source.equals(other.source) && destination.equals(other.destination));
- }
-
- @Override
- public String toString() {
- return "Edge[Source=" + source + ", Destination=" + destination + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
deleted file mode 100644
index 12d9a4f..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lineage;
-
-import java.util.List;
-
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
-
-public class EventNode implements ProvenanceEventLineageNode {
-
- private final ProvenanceEventRecord record;
- private String clusterNodeIdentifier = null;
-
- public EventNode(final ProvenanceEventRecord event) {
- this.record = event;
- }
-
- @Override
- public String getIdentifier() {
- return String.valueOf(getEventIdentifier());
- }
-
- @Override
- public String getClusterNodeIdentifier() {
- return clusterNodeIdentifier;
- }
-
- public void setClusterNodeIdentifier(final String nodeIdentifier) {
- this.clusterNodeIdentifier = nodeIdentifier;
- }
-
- @Override
- public LineageNodeType getNodeType() {
- return LineageNodeType.PROVENANCE_EVENT_NODE;
- }
-
- @Override
- public ProvenanceEventType getEventType() {
- return record.getEventType();
- }
-
- @Override
- public long getTimestamp() {
- return record.getEventTime();
- }
-
- @Override
- public long getEventIdentifier() {
- return record.getEventId();
- }
-
- @Override
- public String getFlowFileUuid() {
- return record.getAttributes().get(CoreAttributes.UUID.key());
- }
-
- @Override
- public List<String> getParentUuids() {
- return record.getParentUuids();
- }
-
- @Override
- public List<String> getChildUuids() {
- return record.getChildUuids();
- }
-
- @Override
- public int hashCode() {
- return 2938472 + record.hashCode();
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof EventNode)) {
- return false;
- }
-
- final EventNode other = (EventNode) obj;
- return record.equals(other.record);
- }
-
- @Override
- public String toString() {
- return "Event[ID=" + record.getEventId() + ", Type=" + record.getEventType() + ", UUID=" + record.getFlowFileUuid() + ", Component=" + record.getComponentId() + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
deleted file mode 100644
index c36c38d..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lineage;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-public class FlowFileLineage implements Lineage {
-
- private final List<LineageNode> nodes;
- private final List<LineageEdge> edges;
-
- public FlowFileLineage(final Collection<LineageNode> nodes, final Collection<LineageEdge> edges) {
- this.nodes = new ArrayList<>(requireNonNull(nodes));
- this.edges = new ArrayList<>(requireNonNull(edges));
- }
-
- @Override
- public List<LineageNode> getNodes() {
- return nodes;
- }
-
- @Override
- public List<LineageEdge> getEdges() {
- return edges;
- }
-
- @Override
- public int hashCode() {
- int sum = 923;
- for (final LineageNode node : nodes) {
- sum += node.hashCode();
- }
-
- for (final LineageEdge edge : edges) {
- sum += edge.hashCode();
- }
-
- return sum;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
-
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof FlowFileLineage)) {
- return false;
- }
-
- final FlowFileLineage other = (FlowFileLineage) obj;
- return nodes.equals(other.nodes) && edges.equals(other.edges);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
deleted file mode 100644
index fdc7470..0000000
--- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lineage;
-
-import static java.util.Objects.requireNonNull;
-
-public class FlowFileNode implements LineageNode {
-
- private final String flowFileUuid;
- private final long creationTime;
- private String clusterNodeIdentifier;
-
- public FlowFileNode(final String flowFileUuid, final long flowFileCreationTime) {
- this.flowFileUuid = requireNonNull(flowFileUuid);
- this.creationTime = flowFileCreationTime;
- }
-
- @Override
- public String getIdentifier() {
- return flowFileUuid;
- }
-
- @Override
- public long getTimestamp() {
- return creationTime;
- }
-
- @Override
- public String getClusterNodeIdentifier() {
- return clusterNodeIdentifier;
- }
-
- @Override
- public LineageNodeType getNodeType() {
- return LineageNodeType.FLOWFILE_NODE;
- }
-
- @Override
- public String getFlowFileUuid() {
- return flowFileUuid;
- }
-
- @Override
- public int hashCode() {
- return 23498723 + flowFileUuid.hashCode();
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj == this) {
- return true;
- }
-
- if (!(obj instanceof FlowFileNode)) {
- return false;
- }
-
- final FlowFileNode other = (FlowFileNode) obj;
- return flowFileUuid.equals(other.flowFileUuid);
- }
-
- @Override
- public String toString() {
- return "FlowFile[UUID=" + flowFileUuid + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/pom.xml b/nifi/commons/flowfile-packager/pom.xml
deleted file mode 100644
index 6e8d58d..0000000
--- a/nifi/commons/flowfile-packager/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-commons-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
-
- <artifactId>flowfile-packager</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <name>FlowFile Packager</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
deleted file mode 100644
index ae16f99..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-public interface FlowFilePackager {
-
- void packageFlowFile(InputStream in, OutputStream out, Map<String, String> attributes, long fileSize) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
deleted file mode 100644
index 07baab1..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.lang3.StringEscapeUtils;
-
-public class FlowFilePackagerV1 implements FlowFilePackager {
-
- public static final String FILENAME_ATTRIBUTES = "flowfile.attributes";
- public static final String FILENAME_CONTENT = "flowfile.content";
- public static final int DEFAULT_TAR_PERMISSIONS = 0644;
-
- private final int tarPermissions;
-
- public FlowFilePackagerV1() {
- this(DEFAULT_TAR_PERMISSIONS);
- }
-
- public FlowFilePackagerV1(final int tarPermissions) {
- this.tarPermissions = tarPermissions;
- }
-
- @Override
- public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException {
- try (final TarArchiveOutputStream tout = new TarArchiveOutputStream(out)) {
- writeAttributesEntry(attributes, tout);
- writeContentEntry(tout, in, fileSize);
- tout.finish();
- tout.flush();
- tout.close();
- }
- }
-
- private void writeAttributesEntry(final Map<String, String> attributes, final TarArchiveOutputStream tout) throws IOException {
- final StringBuilder sb = new StringBuilder();
- sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE properties\n SYSTEM \"http://java.sun.com/dtd/properties.dtd\">\n");
- sb.append("<properties>");
- for (final Map.Entry<String, String> entry : attributes.entrySet()) {
- final String escapedKey = StringEscapeUtils.escapeXml11(entry.getKey());
- final String escapedValue = StringEscapeUtils.escapeXml11(entry.getValue());
- sb.append("\n <entry key=\"").append(escapedKey).append("\">").append(escapedValue).append("</entry>");
- }
- sb.append("</properties>");
-
- final byte[] metaBytes = sb.toString().getBytes(StandardCharsets.UTF_8);
- final TarArchiveEntry attribEntry = new TarArchiveEntry(FILENAME_ATTRIBUTES);
- attribEntry.setMode(tarPermissions);
- attribEntry.setSize(metaBytes.length);
- tout.putArchiveEntry(attribEntry);
- tout.write(metaBytes);
- tout.closeArchiveEntry();
- }
-
- private void writeContentEntry(final TarArchiveOutputStream tarOut, final InputStream inStream, final long fileSize) throws IOException {
- final TarArchiveEntry entry = new TarArchiveEntry(FILENAME_CONTENT);
- entry.setMode(tarPermissions);
- entry.setSize(fileSize);
- tarOut.putArchiveEntry(entry);
- final byte[] buffer = new byte[512 << 10];//512KB
- int bytesRead = 0;
- while ((bytesRead = inStream.read(buffer)) != -1) { //still more data to read
- if (bytesRead > 0) {
- tarOut.write(buffer, 0, bytesRead);
- }
- }
-
- copy(inStream, tarOut);
- tarOut.closeArchiveEntry();
- }
-
- public static long copy(final InputStream source, final OutputStream destination) throws IOException {
- final byte[] buffer = new byte[8192];
- int len;
- long totalCount = 0L;
- while ((len = source.read(buffer)) > 0) {
- destination.write(buffer, 0, len);
- totalCount += len;
- }
- return totalCount;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
deleted file mode 100644
index 6f9d6b1..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * <p>
- * Packages a FlowFile, including both its content and its attributes into a
- * single file that is stream-friendly. The encoding scheme is as such:
- * </p>
- *
- * <pre>
- * Length Field : indicates the number of Flow File Attributes in the stream
- * 1 to N times (N=number of Flow File Attributes):
- * String Field : Flow File Attribute key name
- * String Field : Flow File Attribute value
- * Long : 8 bytes indicating the length of the Flow File content
- * Content : The next M bytes are the content of the Flow File.
- * </pre>
- *
- * <pre>
- * Encoding of String Field is as follows:
- * Length Field : indicates the length of the String
- * 1 to N bytes (N=String length, determined by previous field, as described above) : The UTF-8 encoded string value.
- * </pre>
- *
- * <pre>
- * Encoding of Length Field is as follows:
- * First 2 bytes: Indicate length. If both bytes = 255, this is a special value indicating that the length is
- * greater than or equal to 65536 bytes; therefore, the next 4 bytes will indicate the actual length.
- * </pre>
- *
- * <p>
- * Note: All byte-order encoding is Network Byte Order (Most Significant Byte
- * first)
- * </p>
- *
- * <p>
- * The following example shows the bytes expected if we were to encode a
- * FlowFile containing the following attributes where the content is the text
- * "Hello World!":
- *
- * <br><br>
- * Attributes:
- * <pre>
- * +-------+-------+
- * | Key + Value |
- * + --------------+
- * | A | a |
- * + --------------+
- * | B | b |
- * + --------------+
- * </pre> Content:<br>
- * Hello World!
- * <br><br>
- * Packaged Byte Encoding (In Hexadecimal Form):
- * <p>
- *
- * <pre>
- * 00 02 00 01 41 00 01 61
- * 00 01 42 00 01 62 00 00
- * 00 00 00 00 00 0C 48 65
- * 6C 6C 6F 20 57 6F 72 6C
- * 64 21
- * </pre>
- */
-public class FlowFilePackagerV2 implements FlowFilePackager {
-
- private static final int MAX_VALUE_2_BYTES = 65535;
- private final byte[] writeBuffer = new byte[8];
-
- @Override
- public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException {
- writeFieldLength(out, attributes.size()); //write out the number of attributes
- for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair
- writeString(entry.getKey(), out);
- writeString(entry.getValue(), out);
- }
- writeLong(out, fileSize);//write out length of data
- copy(in, out);//write out the actual flow file payload
- }
-
- private void copy(final InputStream in, final OutputStream out) throws IOException {
- final byte[] buffer = new byte[65536];
- int len;
- while ((len = in.read(buffer)) > 0) {
- out.write(buffer, 0, len);
- }
- }
-
- private void writeString(final String val, final OutputStream out) throws IOException {
- final byte[] bytes = val.getBytes("UTF-8");
- writeFieldLength(out, bytes.length);
- out.write(bytes);
- }
-
- private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException {
- // If the value is less than the max value that can be fit into 2 bytes, just use the
- // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next
- // 4 bytes to indicate the real length.
- if (numBytes < MAX_VALUE_2_BYTES) {
- writeBuffer[0] = (byte) (numBytes >>> 8);
- writeBuffer[1] = (byte) (numBytes);
- out.write(writeBuffer, 0, 2);
- } else {
- writeBuffer[0] = (byte) 0xff;
- writeBuffer[1] = (byte) 0xff;
- writeBuffer[2] = (byte) (numBytes >>> 24);
- writeBuffer[3] = (byte) (numBytes >>> 16);
- writeBuffer[4] = (byte) (numBytes >>> 8);
- writeBuffer[5] = (byte) (numBytes);
- out.write(writeBuffer, 0, 6);
- }
- }
-
- private void writeLong(final OutputStream out, final long val) throws IOException {
- writeBuffer[0] = (byte) (val >>> 56);
- writeBuffer[1] = (byte) (val >>> 48);
- writeBuffer[2] = (byte) (val >>> 40);
- writeBuffer[3] = (byte) (val >>> 32);
- writeBuffer[4] = (byte) (val >>> 24);
- writeBuffer[5] = (byte) (val >>> 16);
- writeBuffer[6] = (byte) (val >>> 8);
- writeBuffer[7] = (byte) (val);
- out.write(writeBuffer, 0, 8);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
deleted file mode 100644
index 181f3e3..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-public class FlowFilePackagerV3 implements FlowFilePackager {
-
- public static final byte[] MAGIC_HEADER = {'N', 'i', 'F', 'i', 'F', 'F', '3'};
- private static final int MAX_VALUE_2_BYTES = 65535;
- private final byte[] writeBuffer = new byte[8];
-
- @Override
- public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException {
- out.write(MAGIC_HEADER);
-
- if (attributes == null) {
- writeFieldLength(out, 0);
- } else {
- writeFieldLength(out, attributes.size()); //write out the number of attributes
- for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair
- writeString(entry.getKey(), out);
- writeString(entry.getValue(), out);
- }
- }
-
- writeLong(out, fileSize);//write out length of data
- copy(in, out);//write out the actual flow file payload
- }
-
- private void copy(final InputStream in, final OutputStream out) throws IOException {
- final byte[] buffer = new byte[65536];
- int len;
- while ((len = in.read(buffer)) > 0) {
- out.write(buffer, 0, len);
- }
- }
-
- private void writeString(final String val, final OutputStream out) throws IOException {
- final byte[] bytes = val.getBytes("UTF-8");
- writeFieldLength(out, bytes.length);
- out.write(bytes);
- }
-
- private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException {
- // If the value is less than the max value that can be fit into 2 bytes, just use the
- // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next
- // 4 bytes to indicate the real length.
- if (numBytes < MAX_VALUE_2_BYTES) {
- writeBuffer[0] = (byte) (numBytes >>> 8);
- writeBuffer[1] = (byte) (numBytes);
- out.write(writeBuffer, 0, 2);
- } else {
- writeBuffer[0] = (byte) 0xff;
- writeBuffer[1] = (byte) 0xff;
- writeBuffer[2] = (byte) (numBytes >>> 24);
- writeBuffer[3] = (byte) (numBytes >>> 16);
- writeBuffer[4] = (byte) (numBytes >>> 8);
- writeBuffer[5] = (byte) (numBytes);
- out.write(writeBuffer, 0, 6);
- }
- }
-
- private void writeLong(final OutputStream out, final long val) throws IOException {
- writeBuffer[0] = (byte) (val >>> 56);
- writeBuffer[1] = (byte) (val >>> 48);
- writeBuffer[2] = (byte) (val >>> 40);
- writeBuffer[3] = (byte) (val >>> 32);
- writeBuffer[4] = (byte) (val >>> 24);
- writeBuffer[5] = (byte) (val >>> 16);
- writeBuffer[6] = (byte) (val >>> 8);
- writeBuffer[7] = (byte) (val);
- out.write(writeBuffer, 0, 8);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
deleted file mode 100644
index fd9d92d..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-public interface FlowFileUnpackager {
-
- Map<String, String> unpackageFlowFile(InputStream in, OutputStream out) throws IOException;
-
- boolean hasMoreData() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
----------------------------------------------------------------------
diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
deleted file mode 100644
index f8ef3d1..0000000
--- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-
-public class FlowFileUnpackagerV1 implements FlowFileUnpackager {
-
- private int flowFilesRead = 0;
-
- @Override
- public Map<String, String> unpackageFlowFile(final InputStream in, final OutputStream out) throws IOException {
- flowFilesRead++;
- final TarArchiveInputStream tarIn = new TarArchiveInputStream(in);
- final TarArchiveEntry attribEntry = tarIn.getNextTarEntry();
- if (attribEntry == null) {
- return null;
- }
-
- final Map<String, String> attributes;
- if (attribEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) {
- attributes = getAttributes(tarIn);
- } else {
- throw new IOException("Expected two tar entries: "
- + FlowFilePackagerV1.FILENAME_CONTENT + " and "
- + FlowFilePackagerV1.FILENAME_ATTRIBUTES);
- }
-
- final TarArchiveEntry contentEntry = tarIn.getNextTarEntry();
-
- if (contentEntry != null && contentEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) {
- final byte[] buffer = new byte[512 << 10];//512KB
- int bytesRead = 0;
- while ((bytesRead = tarIn.read(buffer)) != -1) { //still more data to read
- if (bytesRead > 0) {
- out.write(buffer, 0, bytesRead);
- }
- }
- out.flush();
- } else {
- throw new IOException("Expected two tar entries: "
- + FlowFilePackagerV1.FILENAME_CONTENT + " and "
- + FlowFilePackagerV1.FILENAME_ATTRIBUTES);
- }
-
- return attributes;
- }
-
- protected Map<String, String> getAttributes(final TarArchiveInputStream stream) throws IOException {
-
- final Properties props = new Properties();
- props.loadFromXML(new NonCloseableInputStream(stream));
-
- final Map<String, String> result = new HashMap<>();
- for (final Entry<Object, Object> entry : props.entrySet()) {
- final Object keyObject = entry.getKey();
- final Object valueObject = entry.getValue();
- if (!(keyObject instanceof String)) {
- throw new IOException("Flow file attributes object contains key of type "
- + keyObject.getClass().getCanonicalName()
- + " but expected java.lang.String");
- } else if (!(keyObject instanceof String)) {
- throw new IOException("Flow file attributes object contains value of type "
- + keyObject.getClass().getCanonicalName()
- + " but expected java.lang.String");
- }
-
- final String key = (String) keyObject;
- final String value = (String) valueObject;
- result.put(key, value);
- }
-
- return result;
- }
-
- @Override
- public boolean hasMoreData() throws IOException {
- return flowFilesRead == 0;
- }
-
- public static final class NonCloseableInputStream extends InputStream {
-
- final InputStream stream;
-
- public NonCloseableInputStream(final InputStream stream) {
- this.stream = stream;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public int read() throws IOException {
- return stream.read();
- }
-
- @Override
- public int available() throws IOException {
- return stream.available();
- }
-
- @Override
- public synchronized void mark(int readlimit) {
- stream.mark(readlimit);
- }
-
- @Override
- public synchronized void reset() throws IOException {
- stream.reset();
- }
-
- @Override
- public boolean markSupported() {
- return stream.markSupported();
- }
-
- @Override
- public long skip(long n) throws IOException {
- return stream.skip(n);
- }
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- return stream.read(b, off, len);
- }
-
- @Override
- public int read(byte b[]) throws IOException {
- return stream.read(b);
- }
- }
-}