You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/02 17:27:35 UTC
[10/13] nifi git commit: NIFI-3594 Implemented encrypted provenance
repository. Added src/test/resources/logback-test.xml files resetting log
level from DEBUG (in nifi-data-provenance-utils) to WARN because later tests
depend on MockComponentLog recordin
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 7a2f57e..5a75172 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
-
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.util.FormatUtils;
@@ -50,6 +49,12 @@ public class RepositoryConfiguration {
private int journalCount = 16;
private int compressionBlockBytes = 1024 * 1024;
private int maxAttributeChars = 65536;
+ private int debugFrequency = 1_000_000;
+
+ private Map<String, String> encryptionKeys;
+ private String keyId;
+ private String keyProviderImplementation;
+ private String keyProviderLocation;
private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>();
@@ -360,6 +365,54 @@ public class RepositoryConfiguration {
return Optional.ofNullable(warmCacheFrequencyMinutes);
}
+ public boolean supportsEncryption() {
+ boolean keyProviderIsConfigured = CryptoUtils.isValidKeyProvider(keyProviderImplementation, keyProviderLocation, keyId, encryptionKeys);
+
+ return keyProviderIsConfigured;
+ }
+
+ public Map<String, String> getEncryptionKeys() {
+ return encryptionKeys;
+ }
+
+ public void setEncryptionKeys(Map<String, String> encryptionKeys) {
+ this.encryptionKeys = encryptionKeys;
+ }
+
+ public String getKeyId() {
+ return keyId;
+ }
+
+ public void setKeyId(String keyId) {
+ this.keyId = keyId;
+ }
+
+ public String getKeyProviderImplementation() {
+ return keyProviderImplementation;
+ }
+
+ public void setKeyProviderImplementation(String keyProviderImplementation) {
+ this.keyProviderImplementation = keyProviderImplementation;
+ }
+
+ public String getKeyProviderLocation() {
+ return keyProviderLocation;
+ }
+
+ public void setKeyProviderLocation(String keyProviderLocation) {
+ this.keyProviderLocation = keyProviderLocation;
+ }
+
+
+ public int getDebugFrequency() {
+ return debugFrequency;
+ }
+
+ public void setDebugFrequency(int debugFrequency) {
+ this.debugFrequency = debugFrequency;
+ }
+
+
public static RepositoryConfiguration create(final NiFiProperties nifiProperties) {
final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths();
if (storageDirectories.isEmpty()) {
@@ -436,6 +489,17 @@ public class RepositoryConfiguration {
config.setAlwaysSync(alwaysSync);
+ config.setDebugFrequency(nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_REPO_DEBUG_FREQUENCY, config.getDebugFrequency()));
+
+ // Encryption values may not be present but are only required for EncryptedWriteAheadProvenanceRepository
+ final String implementationClassName = nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+ if (EncryptedWriteAheadProvenanceRepository.class.getName().equals(implementationClassName)) {
+ config.setEncryptionKeys(nifiProperties.getProvenanceRepoEncryptionKeys());
+ config.setKeyId(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID));
+ config.setKeyProviderImplementation(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS));
+ config.setKeyProviderLocation(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION));
+ }
+
return config;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
index 8975028..4782dbe 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
@@ -84,7 +83,7 @@ import org.slf4j.LoggerFactory;
*/
public class WriteAheadProvenanceRepository implements ProvenanceRepository {
private static final Logger logger = LoggerFactory.getLogger(WriteAheadProvenanceRepository.class);
- private static final int BLOCK_SIZE = 1024 * 32;
+ static final int BLOCK_SIZE = 1024 * 32;
public static final String EVENT_CATEGORY = "Provenance Repository";
private final RepositoryConfiguration config;
@@ -129,6 +128,14 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
}
};
+ init(recordWriterFactory, recordReaderFactory, eventReporter, authorizer, resourceFactory);
+ }
+
+ synchronized void init(RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory,
+ final EventReporter eventReporter, final Authorizer authorizer,
+ final ProvenanceAuthorizableFactory resourceFactory) throws IOException {
+ final EventFileManager fileManager = new EventFileManager();
+
eventStore = new PartitionedWriteAheadEventStore(config, recordWriterFactory, recordReaderFactory, eventReporter, fileManager);
final IndexManager indexManager = new SimpleIndexManager(config);
@@ -145,7 +152,7 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
eventStore.reindexLatestEvents(eventIndex);
} catch (final Exception e) {
logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest "
- + "events will not be available from the Provenance Repository when a query is issued.", e);
+ + "events will not be available from the Provenance Repository when a query is issued.", e);
}
}
@@ -282,4 +289,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
public List<SearchableField> getSearchableAttributes() {
return Collections.unmodifiableList(config.getSearchableAttributes());
}
+
+ RepositoryConfiguration getConfig() {
+ return this.config;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
index a583403..f4b47d3 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
@@ -36,7 +36,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
@@ -246,7 +245,7 @@ public class LuceneEventIndex implements EventIndex {
final Document document = eventConverter.convert(event, summary);
if (document == null) {
- logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event);
+ logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId());
} else {
final File indexDir;
if (event.getEventTime() == lastEventTime) {
@@ -291,7 +290,7 @@ public class LuceneEventIndex implements EventIndex {
final Document document = eventConverter.convert(event, location);
if (document == null) {
- logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event);
+ logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId());
} else {
final StoredDocument doc = new StoredDocument(document, location);
boolean added = false;
@@ -357,13 +356,13 @@ public class LuceneEventIndex implements EventIndex {
eventOption = eventStore.getEvent(eventId);
} catch (final Exception e) {
logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e);
- final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity());
+ final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, user.getIdentity());
result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information.");
return result;
}
if (!eventOption.isPresent()) {
- final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity());
+ final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, user.getIdentity());
result.getResult().setError("Could not find Provenance Event with ID " + eventId);
lineageSubmissionMap.put(result.getLineageIdentifier(), result);
return result;
@@ -524,7 +523,7 @@ public class LuceneEventIndex implements EventIndex {
}
default: {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN,
- eventId, Collections.<String> emptyList(), 1, userId);
+ eventId, Collections.emptyList(), 1, userId);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
@@ -533,7 +532,7 @@ public class LuceneEventIndex implements EventIndex {
}
} catch (final Exception e) {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN,
- eventId, Collections.<String> emptyList(), 1, userId);
+ eventId, Collections.emptyList(), 1, userId);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Failed to expand children for lineage of event with ID " + eventId + " due to: " + e);
return submission;
@@ -564,7 +563,7 @@ public class LuceneEventIndex implements EventIndex {
}
default: {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS,
- eventId, Collections.<String> emptyList(), 1, userId);
+ eventId, Collections.emptyList(), 1, userId);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
@@ -573,7 +572,7 @@ public class LuceneEventIndex implements EventIndex {
}
} catch (final Exception e) {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS,
- eventId, Collections.<String> emptyList(), 1, userId);
+ eventId, Collections.emptyList(), 1, userId);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Failed to expand parents for lineage of event with ID " + eventId + " due to: " + e);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
index d6f50dd..2bc7fbe 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
@@ -56,4 +56,12 @@ public class EventFieldNames {
public static final String EXPLICIT_VALUE = "Explicit Value";
public static final String LOOKUP_VALUE = "Lookup Value";
public static final String UNCHANGED_VALUE = "Unchanged";
+
+ // For encrypted records
+ public static final String IS_ENCRYPTED = "Encrypted Record";
+ public static final String KEY_ID = "Encryption Key ID";
+ public static final String VERSION = "Encryption Version";
+ public static final String ALGORITHM = "Encryption Algorithm";
+ public static final String IV = "Initialization Vector";
+ public static final String ENCRYPTION_DETAILS = "Encryption Details";
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
index 7b33ded..0577636 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
@@ -66,12 +66,12 @@ public class LookupTableEventRecordFields {
public static final RecordField CONTENT_CLAIM_SIZE = new SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SIZE, FieldType.LONG, EXACTLY_ONE);
public static final RecordField PREVIOUS_CONTENT_CLAIM = new ComplexRecordField(EventFieldNames.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE,
- CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+ CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
public static final RecordField CURRENT_CONTENT_CLAIM_EXPLICIT = new ComplexRecordField(EventFieldNames.EXPLICIT_VALUE, EXACTLY_ONE,
- CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+ CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
public static final RecordField CURRENT_CONTENT_CLAIM = new UnionRecordField(EventFieldNames.CONTENT_CLAIM,
- Repetition.EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, CURRENT_CONTENT_CLAIM_EXPLICIT);
+ Repetition.EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, CURRENT_CONTENT_CLAIM_EXPLICIT);
// EventType-Specific fields
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
index 7110336..d596c8e 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
@@ -45,7 +45,6 @@ import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.UPD
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-
import org.apache.nifi.repository.schema.RecordField;
import org.apache.nifi.repository.schema.RecordSchema;
@@ -90,5 +89,4 @@ public class LookupTableEventSchema {
final RecordSchema schema = new RecordSchema(fields);
return schema;
}
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
index 93c0669..dfbcd2b 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
@@ -25,7 +25,6 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.zip.GZIPInputStream;
-
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.toc.TocReader;
@@ -333,7 +332,7 @@ public abstract class CompressableRecordReader implements RecordReader {
try {
boolean read = true;
while (read) {
- final Optional<StandardProvenanceEventRecord> eventOptional = readToEvent(eventId, dis, serializationVersion);
+ final Optional<StandardProvenanceEventRecord> eventOptional = this.readToEvent(eventId, dis, serializationVersion);
if (eventOptional.isPresent()) {
pushbackEvent = eventOptional.get();
return Optional.of(pushbackEvent);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 8e79ddd..7ae4adc 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -27,9 +27,11 @@ import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.zip.GZIPInputStream;
-
+import org.apache.nifi.properties.NiFiPropertiesLoader;
import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
+import org.apache.nifi.provenance.CryptoUtils;
+import org.apache.nifi.provenance.EncryptedSchemaRecordReader;
import org.apache.nifi.provenance.EventIdFirstSchemaRecordReader;
import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
import org.apache.nifi.provenance.StandardRecordReader;
@@ -37,17 +39,25 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
import org.apache.nifi.provenance.toc.StandardTocReader;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RecordReaders {
+ private static Logger logger = LoggerFactory.getLogger(RecordReaders.class);
+
+ private static boolean isEncryptionAvailable = false;
+ private static boolean encryptionPropertiesRead = false;
+
/**
* Creates a new Record Reader that is capable of reading Provenance Event Journals
*
- * @param file the Provenance Event Journal to read data from
+ * @param file the Provenance Event Journal to read data from
* @param provenanceLogFiles collection of all provenance journal files
- * @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid
- * issues where a FlowFile has an extremely large attribute and reading events
- * for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap
+ * @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid
+ * issues where a FlowFile has an extremely large attribute and reading events
+ * for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap
* @return a Record Reader capable of reading Provenance Event Journals
* @throws IOException if unable to create a Record Reader for the given file
*/
@@ -68,7 +78,7 @@ public class RecordReaders {
}
}
- if ( file.exists() ) {
+ if (file.exists()) {
try {
fis = new FileInputStream(file);
} catch (final FileNotFoundException fnfe) {
@@ -77,7 +87,8 @@ public class RecordReaders {
}
String filename = file.getName();
- openStream: while ( fis == null ) {
+ openStream:
+ while (fis == null) {
final File dir = file.getParentFile();
final String baseName = LuceneUtil.substringBefore(file.getName(), ".prov");
@@ -85,9 +96,9 @@ public class RecordReaders {
// filename that we need. The majority of the time, we will use the extension ".prov.gz"
// because most often we are compressing on rollover and most often we have already finished
// compressing by the time that we are querying the data.
- for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+ for (final String extension : new String[]{".prov.gz", ".prov"}) {
file = new File(dir, baseName + extension);
- if ( file.exists() ) {
+ if (file.exists()) {
try {
fis = new FileInputStream(file);
filename = baseName + extension;
@@ -104,7 +115,7 @@ public class RecordReaders {
break;
}
- if ( fis == null ) {
+ if (fis == null) {
throw new FileNotFoundException("Unable to locate file " + originalFile);
}
@@ -148,12 +159,25 @@ public class RecordReaders {
final TocReader tocReader = new StandardTocReader(tocFile);
return new EventIdFirstSchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
}
+ case EncryptedSchemaRecordReader.SERIALIZATION_NAME: {
+ if (!tocFile.exists()) {
+ throw new FileNotFoundException("Cannot create TOC Reader because the file " + tocFile + " does not exist");
+ }
+
+ if (!isEncryptionAvailable()) {
+ throw new IOException("Cannot read encrypted repository because this reader is not configured for encryption");
+ }
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+ // Return a reader with no eventEncryptor because this method contract cannot change, then inject the encryptor from the writer in the calling method
+ return new EncryptedSchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars, null);
+ }
default: {
throw new IOException("Unable to read data from file " + file + " because the file was written using an unknown Serializer: " + serializationName);
}
}
} catch (final IOException ioe) {
- if ( fis != null ) {
+ if (fis != null) {
try {
fis.close();
} catch (final IOException inner) {
@@ -165,4 +189,20 @@ public class RecordReaders {
}
}
+ private static boolean isEncryptionAvailable() {
+ if (encryptionPropertiesRead) {
+ return isEncryptionAvailable;
+ } else {
+ try {
+ NiFiProperties niFiProperties = NiFiPropertiesLoader.loadDefaultWithKeyFromBootstrap();
+ isEncryptionAvailable = CryptoUtils.isProvenanceRepositoryEncryptionConfigured(niFiProperties);
+ encryptionPropertiesRead = true;
+ } catch (IOException e) {
+ logger.error("Encountered an error checking the provenance repository encryption configuration: ", e);
+ isEncryptionAvailable = false;
+ }
+ return isEncryptionAvailable;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java
index 41d5ade..766278a 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java
@@ -19,7 +19,6 @@ package org.apache.nifi.provenance.util;
import java.util.List;
import java.util.Map;
-
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.serialization.StorageSummary;
@@ -182,4 +181,14 @@ public class StorageSummaryEvent implements ProvenanceEventRecord {
public Long getPreviousContentClaimOffset() {
return event.getPreviousContentClaimOffset();
}
+
+ /**
+ * Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability).
+ *
+ * @return a descriptive event ID to allow tracing
+ */
+ @Override
+ public String getBestEventIdentifier() {
+ return Long.toString(getEventId());
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository
index 6a353d2..94cc70c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.provenance.PersistentProvenanceRepository
-org.apache.nifi.provenance.WriteAheadProvenanceRepository
\ No newline at end of file
+org.apache.nifi.provenance.WriteAheadProvenanceRepository
+org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
new file mode 100644
index 0000000..ec8c225
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance
+
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.serialization.RecordReader
+import org.apache.nifi.provenance.serialization.RecordWriter
+import org.apache.nifi.provenance.toc.StandardTocReader
+import org.apache.nifi.provenance.toc.StandardTocWriter
+import org.apache.nifi.provenance.toc.TocReader
+import org.apache.nifi.provenance.toc.TocUtil
+import org.apache.nifi.provenance.toc.TocWriter
+import org.apache.nifi.util.file.FileUtils
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.bouncycastle.util.encoders.Hex
+import org.junit.After
+import org.junit.AfterClass
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.ClassRule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import javax.crypto.Cipher
+import javax.crypto.spec.SecretKeySpec
+import java.security.KeyManagementException
+import java.security.Security
+import java.util.concurrent.atomic.AtomicLong
+
+import static groovy.test.GroovyAssert.shouldFail
+import static org.apache.nifi.provenance.TestUtil.createFlowFile
+
+@RunWith(JUnit4.class)
+class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWriter {
+ private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReaderWriterTest.class)
+
+ private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
+ private static final String KEY_HEX_256 = KEY_HEX_128 * 2
+ private static final String KEY_HEX = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128
+ private static final String KEY_ID = "K1"
+
+ private static final String TRANSIT_URI = "nifi://unit-test"
+ private static final String PROCESSOR_TYPE = "Mock Processor"
+ private static final String COMPONENT_ID = "1234"
+
+ private static final int UNCOMPRESSED_BLOCK_SIZE = 1024 * 32
+ private static final int MAX_ATTRIBUTE_SIZE = 2048
+
+ private static final AtomicLong idGenerator = new AtomicLong(0L)
+ private File journalFile
+ private File tocFile
+
+ private static KeyProvider mockKeyProvider
+ private static ProvenanceEventEncryptor provenanceEventEncryptor = new AESProvenanceEventEncryptor()
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder()
+
+ private static String ORIGINAL_LOG_LEVEL
+
+ @BeforeClass
+ static void setUpOnce() throws Exception {
+ ORIGINAL_LOG_LEVEL = System.getProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance")
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG")
+
+ Security.addProvider(new BouncyCastleProvider())
+
+ logger.metaClass.methodMissing = { String name, args ->
+ logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+ }
+
+ mockKeyProvider = [
+ getKey : { String keyId ->
+ logger.mock("Requesting key ID: ${keyId}")
+ if (keyId == KEY_ID) {
+ new SecretKeySpec(Hex.decode(KEY_HEX), "AES")
+ } else {
+ throw new KeyManagementException("${keyId} is not available")
+ }
+ },
+ getAvailableKeyIds: { ->
+ logger.mock("Available key IDs: [${KEY_ID}]")
+ [KEY_ID]
+ },
+ keyExists : { String keyId ->
+ logger.mock("Checking availability of key ID: ${keyId}")
+ keyId == KEY_ID
+ }] as KeyProvider
+ provenanceEventEncryptor.initialize(mockKeyProvider)
+ }
+
+ @Before
+ void setUp() throws Exception {
+ journalFile = new File("target/storage/${UUID.randomUUID()}/testEventIdFirstSchemaRecordReaderWriter")
+ tocFile = TocUtil.getTocFile(journalFile)
+ idGenerator.set(0L)
+ }
+
+ @After
+ void tearDown() throws Exception {
+ try {
+ FileUtils.deleteFile(journalFile.getParentFile(), true)
+ } catch (Exception e) {
+ logger.error(e.getMessage())
+ }
+ }
+
+ @AfterClass
+ static void tearDownOnce() throws Exception {
+ if (ORIGINAL_LOG_LEVEL) {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", ORIGINAL_LOG_LEVEL)
+ }
+ try {
+ FileUtils.deleteFile(new File("target/storage"), true)
+ } catch (Exception e) {
+ logger.error(e)
+ }
+ }
+
+ private static boolean isUnlimitedStrengthCryptoAvailable() {
+ Cipher.getMaxAllowedKeyLength("AES") > 128
+ }
+
+ private static
+ final FlowFile buildFlowFile(Map attributes = [:], long id = idGenerator.getAndIncrement(), long fileSize = 3000L) {
+ if (!attributes?.uuid) {
+ attributes.uuid = UUID.randomUUID().toString()
+ }
+ createFlowFile(id, fileSize, attributes)
+ }
+
+ private
+ static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) {
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
+ builder.setEventTime(eventTime)
+ builder.setEventType(eventType)
+ builder.setTransitUri(transitUri)
+ builder.fromFlowFile(flowfile)
+ builder.setComponentId(componentId)
+ builder.setComponentType(componentType)
+ builder.build()
+ }
+
+ @Override
+ protected RecordWriter createWriter(
+ final File file,
+ final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+ createWriter(file, tocWriter, compressed, uncompressedBlockSize, provenanceEventEncryptor)
+ }
+
+ protected static RecordWriter createWriter(
+ final File file,
+ final TocWriter tocWriter,
+ final boolean compressed,
+ final int uncompressedBlockSize, ProvenanceEventEncryptor encryptor) throws IOException {
+ return new EncryptedSchemaRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize, IdentifierLookup.EMPTY, encryptor, 1)
+ }
+
+ @Override
+ protected RecordReader createReader(
+ final InputStream inputStream,
+ final String journalFilename, final TocReader tocReader, final int maxAttributeSize) throws IOException {
+ return new EncryptedSchemaRecordReader(inputStream, journalFilename, tocReader, maxAttributeSize, provenanceEventEncryptor)
+ }
+
+ /**
+ * Build a record and write it to the repository with the encrypted writer. Recover with the encrypted reader and verify.
+ */
+ @Test
+ void testShouldWriteAndReadEncryptedRecord() {
+ // Arrange
+ final ProvenanceEventRecord record = buildEventRecord()
+ logger.info("Built sample PER: ${record}")
+
+ TocWriter tocWriter = new StandardTocWriter(tocFile, false, false)
+
+ RecordWriter encryptedWriter = createWriter(journalFile, tocWriter, false, UNCOMPRESSED_BLOCK_SIZE)
+ logger.info("Generated encrypted writer: ${encryptedWriter}")
+
+ // Act
+ int encryptedRecordId = idGenerator.get()
+ encryptedWriter.writeHeader(encryptedRecordId)
+ encryptedWriter.writeRecord(record)
+ encryptedWriter.close()
+ logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
+
+ // Assert
+ TocReader tocReader = new StandardTocReader(tocFile)
+ final FileInputStream fis = new FileInputStream(journalFile)
+ final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, MAX_ATTRIBUTE_SIZE)
+ logger.info("Generated encrypted reader: ${reader}")
+
+ ProvenanceEventRecord encryptedEvent = reader.nextRecord()
+ assert encryptedEvent
+ assert encryptedRecordId as long == encryptedEvent.getEventId()
+ assert record.componentId == encryptedEvent.getComponentId()
+ assert record.componentType == encryptedEvent.getComponentType()
+ logger.info("Successfully read encrypted record: ${encryptedEvent}")
+
+ assert !reader.nextRecord()
+ }
+
+ /**
+ * Build a record and write it with a standard writer and the encrypted writer to different repositories. Recover with the standard reader and the contents of the encrypted record should be unreadable.
+ */
+ @Test
+ void testShouldWriteEncryptedRecordAndPlainRecord() {
+ // Arrange
+ final ProvenanceEventRecord record = buildEventRecord()
+ logger.info("Built sample PER: ${record}")
+
+ TocWriter tocWriter = new StandardTocWriter(tocFile, false, false)
+
+ RecordWriter standardWriter = new EventIdFirstSchemaRecordWriter(journalFile, idGenerator, tocWriter, false, UNCOMPRESSED_BLOCK_SIZE, IdentifierLookup.EMPTY)
+ logger.info("Generated standard writer: ${standardWriter}")
+
+ File encryptedJournalFile = new File(journalFile.absolutePath + "_encrypted")
+ File encryptedTocFile = TocUtil.getTocFile(encryptedJournalFile)
+ TocWriter encryptedTocWriter = new StandardTocWriter(encryptedTocFile, false, false)
+ RecordWriter encryptedWriter = createWriter(encryptedJournalFile, encryptedTocWriter, false, UNCOMPRESSED_BLOCK_SIZE)
+ logger.info("Generated encrypted writer: ${encryptedWriter}")
+
+ // Act
+ int standardRecordId = idGenerator.get()
+ standardWriter.writeHeader(standardRecordId)
+ standardWriter.writeRecord(record)
+ standardWriter.close()
+ logger.info("Wrote standard record ${standardRecordId} to journal")
+
+ int encryptedRecordId = idGenerator.get()
+ encryptedWriter.writeHeader(encryptedRecordId)
+ encryptedWriter.writeRecord(record)
+ encryptedWriter.close()
+ logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
+
+ // Assert
+ TocReader tocReader = new StandardTocReader(tocFile)
+ final FileInputStream fis = new FileInputStream(journalFile)
+ final RecordReader reader = new EventIdFirstSchemaRecordReader(fis, journalFile.getName(), tocReader, MAX_ATTRIBUTE_SIZE)
+ logger.info("Generated standard reader: ${reader}")
+
+ ProvenanceEventRecord standardEvent = reader.nextRecord()
+ assert standardEvent
+ assert standardRecordId as long == standardEvent.getEventId()
+ assert record.componentId == standardEvent.getComponentId()
+ assert record.componentType == standardEvent.getComponentType()
+ logger.info("Successfully read standard record: ${standardEvent}")
+
+ assert !reader.nextRecord()
+
+ // Demonstrate unable to read from encrypted file with standard reader
+ TocReader incompatibleTocReader = new StandardTocReader(encryptedTocFile)
+ final FileInputStream efis = new FileInputStream(encryptedJournalFile)
+ RecordReader incompatibleReader = new EventIdFirstSchemaRecordReader(efis, encryptedJournalFile.getName(), incompatibleTocReader, MAX_ATTRIBUTE_SIZE)
+ logger.info("Generated standard reader (attempting to read encrypted file): ${incompatibleReader}")
+
+ def msg = shouldFail(EOFException) {
+ ProvenanceEventRecord encryptedEvent = incompatibleReader.nextRecord()
+ }
+ logger.expected(msg)
+ assert msg =~ "EOFException: Failed to read field"
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
new file mode 100644
index 0000000..42cc881
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance
+
+import org.apache.nifi.events.EventReporter
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.serialization.RecordReaders
+import org.apache.nifi.reporting.Severity
+import org.apache.nifi.util.file.FileUtils
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.AfterClass
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.ClassRule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import javax.crypto.Cipher
+import java.security.Security
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile
+
+@RunWith(JUnit4.class)
+class EncryptedWriteAheadProvenanceRepositoryTest {
+ private static final Logger logger = LoggerFactory.getLogger(EncryptedWriteAheadProvenanceRepositoryTest.class)
+
+ private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
+ private static final String KEY_HEX_256 = KEY_HEX_128 * 2
+ private static final String KEY_HEX = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128
+ private static final String KEY_ID = "K1"
+
+ private static final String TRANSIT_URI = "nifi://unit-test"
+ private static final String PROCESSOR_TYPE = "Mock Processor"
+ private static final String COMPONENT_ID = "1234"
+
+ private static final AtomicLong recordId = new AtomicLong()
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder()
+
+ private ProvenanceRepository repo
+ private static RepositoryConfiguration config
+
+ public static final int DEFAULT_ROLLOVER_MILLIS = 2000
+ private EventReporter eventReporter
+ private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>())
+
+ private static String ORIGINAL_LOG_LEVEL
+
+ @BeforeClass
+ static void setUpOnce() throws Exception {
+ ORIGINAL_LOG_LEVEL = System.getProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance")
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG")
+
+ Security.addProvider(new BouncyCastleProvider())
+
+ logger.metaClass.methodMissing = { String name, args ->
+ logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+ }
+ }
+
+ @Before
+ void setUp() throws Exception {
+ reportedEvents?.clear()
+ eventReporter = createMockEventReporter()
+ }
+
+ @After
+ void tearDown() throws Exception {
+ closeRepo(repo, config)
+
+ // Reset the boolean determiner
+ RecordReaders.encryptionPropertiesRead = false
+ RecordReaders.isEncryptionAvailable = false
+ }
+
+ @AfterClass
+ static void tearDownOnce() throws Exception {
+ if (ORIGINAL_LOG_LEVEL) {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", ORIGINAL_LOG_LEVEL)
+ }
+ }
+
+ private static boolean isUnlimitedStrengthCryptoAvailable() {
+ Cipher.getMaxAllowedKeyLength("AES") > 128
+ }
+
+ private static RepositoryConfiguration createConfiguration() {
+ RepositoryConfiguration config = new RepositoryConfiguration()
+ config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID().toString()))
+ config.setCompressOnRollover(true)
+ config.setMaxEventFileLife(2000L, TimeUnit.SECONDS)
+ config.setCompressionBlockBytes(100)
+ return config
+ }
+
+ private static RepositoryConfiguration createEncryptedConfiguration() {
+ RepositoryConfiguration config = createConfiguration()
+ config.setEncryptionKeys([(KEY_ID): KEY_HEX])
+ config.setKeyId(KEY_ID)
+ config.setKeyProviderImplementation(StaticKeyProvider.class.name)
+ config
+ }
+
+ private EventReporter createMockEventReporter() {
+ [reportEvent: { Severity s, String c, String m ->
+ ReportedEvent event = new ReportedEvent(s, c, m)
+ reportedEvents.add(event)
+ logger.mock("Added ${event}")
+ }] as EventReporter
+ }
+
+ private void closeRepo(ProvenanceRepository repo = this.repo, RepositoryConfiguration config = this.config) throws IOException {
+ if (repo == null) {
+ return
+ }
+
+ try {
+ repo.close()
+ } catch (IOException ioe) {
+ }
+
+ // Delete all of the storage files. We do this in order to clean up the tons of files that
+ // we create but also to ensure that we have closed all of the file handles. If we leave any
+ // streams open, for instance, this will throw an IOException, causing our unit test to fail.
+ if (config != null) {
+ for (final File storageDir : config.getStorageDirectories().values()) {
+ int i
+ for (i = 0; i < 3; i++) {
+ try {
+ FileUtils.deleteFile(storageDir, true)
+ break
+ } catch (IOException ioe) {
+ // if there is a virus scanner, etc. running in the background we may not be able to
+ // delete the file. Wait a sec and try again.
+ if (i == 2) {
+ throw ioe
+ } else {
+ try {
+ System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists())
+ FileUtils.deleteFile(storageDir, true)
+ break
+ } catch (final IOException ioe2) {
+ // if there is a virus scanner, etc. running in the background we may not be able to
+ // delete the file. Wait a sec and try again.
+ if (i == 2) {
+ throw ioe2
+ } else {
+ try {
+ Thread.sleep(1000L)
+ } catch (final InterruptedException ie) {
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static
+ final FlowFile buildFlowFile(Map attributes = [:], long id = recordId.getAndIncrement(), long fileSize = 3000L) {
+ if (!attributes?.uuid) {
+ attributes.uuid = UUID.randomUUID().toString()
+ }
+ createFlowFile(id, fileSize, attributes)
+ }
+
+ private
+ static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) {
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
+ builder.setEventTime(eventTime)
+ builder.setEventType(eventType)
+ builder.setTransitUri(transitUri)
+ builder.fromFlowFile(flowfile)
+ builder.setComponentId(componentId)
+ builder.setComponentType(componentType)
+ builder.build()
+ }
+
+ /**
+ * This test operates on {@link PersistentProvenanceRepository} to verify the normal operations of existing implementations.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ void testPersistentProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
+ // Arrange
+ config = createConfiguration()
+ config.setMaxEventFileCapacity(1L)
+ config.setMaxEventFileLife(1, TimeUnit.SECONDS)
+ repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS)
+ repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
+
+ Map attributes = ["abc": "xyz",
+ "123": "456"]
+ final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
+
+ final int RECORD_COUNT = 10
+
+ // Act
+ RECORD_COUNT.times {
+ repo.registerEvent(record)
+ }
+
+ // Sleep to let the journal merge occur
+ Thread.sleep(1000L)
+
+ final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
+
+ logger.info("Recovered ${recoveredRecords.size()} events: ")
+ recoveredRecords.each { logger.info("\t${it}") }
+
+ // Assert
+ assert recoveredRecords.size() == RECORD_COUNT
+ recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
+ assert recovered.getEventId() == (i as Long)
+ assert recovered.getTransitUri() == TRANSIT_URI
+ assert recovered.getEventType() == ProvenanceEventType.RECEIVE
+ // The UUID was added later but we care that all attributes we provided are still there
+ assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet())
+ }
+ }
+
+ /**
+ * This test operates on {@link WriteAheadProvenanceRepository} to verify the normal operations of existing implementations.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ void testWriteAheadProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
+ // Arrange
+ config = createConfiguration()
+ // Needed until NIFI-3605 is implemented
+// config.setMaxEventFileCapacity(1L)
+ config.setMaxEventFileCount(1)
+ config.setMaxEventFileLife(1, TimeUnit.SECONDS)
+ repo = new WriteAheadProvenanceRepository(config)
+ repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
+
+ Map attributes = ["abc": "xyz",
+ "123": "456"]
+ final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
+
+ final int RECORD_COUNT = 10
+
+ // Act
+ RECORD_COUNT.times {
+ repo.registerEvent(record)
+ }
+
+ final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
+
+ logger.info("Recovered ${recoveredRecords.size()} events: ")
+ recoveredRecords.each { logger.info("\t${it}") }
+
+ // Assert
+ assert recoveredRecords.size() == RECORD_COUNT
+ recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
+ assert recovered.getEventId() == (i as Long)
+ assert recovered.getTransitUri() == TRANSIT_URI
+ assert recovered.getEventType() == ProvenanceEventType.RECEIVE
+ // The UUID was added later but we care that all attributes we provided are still there
+ assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet())
+ }
+ }
+
+ @Test
+ void testShouldRegisterAndGetEvent() {
+ // Arrange
+
+ // Override the boolean determiner
+ RecordReaders.encryptionPropertiesRead = true
+ RecordReaders.isEncryptionAvailable = true
+
+ config = createEncryptedConfiguration()
+ // Needed until NIFI-3605 is implemented
+// config.setMaxEventFileCapacity(1L)
+ config.setMaxEventFileCount(1)
+ config.setMaxEventFileLife(1, TimeUnit.SECONDS)
+ repo = new EncryptedWriteAheadProvenanceRepository(config)
+ repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
+
+ Map attributes = ["abc": "This is a plaintext attribute.",
+ "123": "This is another plaintext attribute."]
+ final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
+
+ final long LAST_RECORD_ID = repo.getMaxEventId()
+
+ // Act
+ repo.registerEvent(record)
+
+ // Retrieve the event through the interface
+ ProvenanceEventRecord recoveredRecord = repo.getEvent(LAST_RECORD_ID + 1)
+ logger.info("Recovered ${recoveredRecord}")
+
+ // Assert
+ assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1
+ assert recoveredRecord.getTransitUri() == TRANSIT_URI
+ assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE
+ // The UUID was added later but we care that all attributes we provided are still there
+ assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet())
+ }
+
+ @Test
+ void testShouldRegisterAndGetEvents() {
+ // Arrange
+ final int RECORD_COUNT = 10
+
+ // Override the boolean determiner
+ RecordReaders.encryptionPropertiesRead = true
+ RecordReaders.isEncryptionAvailable = true
+
+ config = createEncryptedConfiguration()
+ // Needed until NIFI-3605 is implemented
+// config.setMaxEventFileCapacity(1L)
+ config.setMaxEventFileCount(1)
+ config.setMaxEventFileLife(1, TimeUnit.SECONDS)
+ repo = new EncryptedWriteAheadProvenanceRepository(config)
+ repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
+
+ Map attributes = ["abc": "This is a plaintext attribute.",
+ "123": "This is another plaintext attribute."]
+ final List<ProvenanceEventRecord> records = []
+ RECORD_COUNT.times { int i ->
+ records << buildEventRecord(buildFlowFile(attributes + [count: i as String]))
+ }
+ logger.info("Generated ${RECORD_COUNT} records")
+
+ final long LAST_RECORD_ID = repo.getMaxEventId()
+
+ // Act
+ repo.registerEvents(records)
+ logger.info("Registered events")
+
+ // Retrieve the events through the interface
+ List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
+ logger.info("Recovered ${recoveredRecords.size()} records")
+
+ // Assert
+ recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i ->
+ assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1 + i
+ assert recoveredRecord.getTransitUri() == TRANSIT_URI
+ assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE
+ // The UUID was added later but we care that all attributes we provided are still there
+ assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet())
+ assert recoveredRecord.getAttribute("count") == i as String
+ }
+ }
+
+ private static class ReportedEvent {
+ final Severity severity
+ final String category
+ final String message
+
+ ReportedEvent(final Severity severity, final String category, final String message) {
+ this.severity = severity
+ this.category = category
+ this.message = message
+ }
+
+ @Override
+ String toString() {
+ "ReportedEvent [${severity}] ${category}: ${message}"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
index 36397c4..4b2ca50 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
-
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.toc.StandardTocReader;
@@ -67,19 +66,25 @@ public abstract class AbstractTestRecordReaderWriter {
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
+ final String expectedTransitUri = "nifi://unit-test";
+ final int expectedBlockIndex = 0;
+
+ assertRecoveredRecord(journalFile, tocReader, expectedTransitUri, expectedBlockIndex);
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+ private void assertRecoveredRecord(File journalFile, TocReader tocReader, String expectedTransitUri, int expectedBlockIndex) throws IOException {
try (final FileInputStream fis = new FileInputStream(journalFile);
- final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
- assertEquals(0, reader.getBlockIndex());
- reader.skipToBlock(0);
+ final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
+ assertEquals(expectedBlockIndex, reader.getBlockIndex());
+ reader.skipToBlock(expectedBlockIndex);
final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered);
- assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertEquals(expectedTransitUri, recovered.getTransitUri());
assertNull(reader.nextRecord());
}
-
- FileUtils.deleteFile(journalFile.getParentFile(), true);
}
@@ -96,16 +101,7 @@ public abstract class AbstractTestRecordReaderWriter {
final TocReader tocReader = new StandardTocReader(tocFile);
- try (final FileInputStream fis = new FileInputStream(journalFile);
- final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
- assertEquals(0, reader.getBlockIndex());
- reader.skipToBlock(0);
- final StandardProvenanceEventRecord recovered = reader.nextRecord();
- assertNotNull(recovered);
-
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- assertNull(reader.nextRecord());
- }
+ assertRecoveredRecord(journalFile, tocReader, "nifi://unit-test", 0);
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index f08fed4..c3fbf42 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -16,6 +16,25 @@
*/
package org.apache.nifi.provenance;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
@@ -42,26 +61,6 @@ import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.RingBuffer.IterationDirection;
import org.apache.nifi.web.ResourceNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
public class VolatileProvenanceRepository implements ProvenanceRepository {
// properties
@@ -472,7 +471,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
}
public Lineage computeLineage(final String flowFileUUID, final NiFiUser user) throws IOException {
- return computeLineage(Collections.<String>singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null);
+ return computeLineage(Collections.singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null);
}
private Lineage computeLineage(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) throws IOException {
@@ -497,7 +496,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
final ProvenanceEventRecord event = getEvent(eventId);
if (event == null) {
final String userId = user.getIdentity();
- final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String>emptySet(), 1, userId);
+ final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, userId);
result.getResult().setError("Could not find event with ID " + eventId);
lineageSubmissionMap.put(result.getLineageIdentifier(), result);
return result;
@@ -541,9 +540,9 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
final ProvenanceEventRecord event = getEvent(eventId, user);
if (event == null) {
- final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId);
+ final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1, userId);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
- submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L);
+ submission.getResult().update(Collections.emptyList(), 0L);
return submission;
}
@@ -554,7 +553,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
case CLONE:
return submitLineageComputation(event.getParentUuids(), user, LineageComputationType.EXPAND_PARENTS, eventId);
default: {
- final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId);
+ final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1, userId);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
return submission;
@@ -572,9 +571,9 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
final ProvenanceEventRecord event = getEvent(eventId, user);
if (event == null) {
- final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId);
+ final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1, userId);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
- submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L);
+ submission.getResult().update(Collections.emptyList(), 0L);
return submission;
}
@@ -585,7 +584,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
case CLONE:
return submitLineageComputation(event.getChildUuids(), user, LineageComputationType.EXPAND_CHILDREN, eventId);
default: {
- final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId);
+ final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1, userId);
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
return submission;
@@ -873,5 +872,15 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
public Long getPreviousContentClaimOffset() {
return record.getPreviousContentClaimOffset();
}
+
+ /**
+ * Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability).
+ *
+ * @return a descriptive event ID to allow tracing
+ */
+ @Override
+ public String getBestEventIdentifier() {
+ return Long.toString(getEventId());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index d0a1f1c..0b206e2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -437,7 +437,7 @@
<exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude>
<exclude>src/test/resources/TestExtractGrok/patterns</exclude>
<!-- This file is copied from https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 and we must support Java 7 -->
- <exclude>src/main/java/org/apache/nifi/processors/standard/util/crypto/bcrypt/BCrypt.java</exclude>
+ <exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
index 103790e..db6d9ba 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
@@ -16,6 +16,16 @@
*/
package org.apache.nifi.processors.standard;
+import java.nio.charset.StandardCharsets;
+import java.security.Security;
+import java.text.Normalizer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
@@ -41,27 +51,16 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.crypto.CipherUtility;
-import org.apache.nifi.processors.standard.util.crypto.KeyedEncryptor;
-import org.apache.nifi.processors.standard.util.crypto.OpenPGPKeyBasedEncryptor;
-import org.apache.nifi.processors.standard.util.crypto.OpenPGPPasswordBasedEncryptor;
-import org.apache.nifi.processors.standard.util.crypto.PasswordBasedEncryptor;
import org.apache.nifi.security.util.EncryptionMethod;
import org.apache.nifi.security.util.KeyDerivationFunction;
+import org.apache.nifi.security.util.crypto.CipherUtility;
+import org.apache.nifi.security.util.crypto.KeyedEncryptor;
+import org.apache.nifi.security.util.crypto.OpenPGPKeyBasedEncryptor;
+import org.apache.nifi.security.util.crypto.OpenPGPPasswordBasedEncryptor;
+import org.apache.nifi.security.util.crypto.PasswordBasedEncryptor;
import org.apache.nifi.util.StopWatch;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
-import java.nio.charset.StandardCharsets;
-import java.security.Security;
-import java.text.Normalizer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
@EventDriven
@SideEffectFree
@SupportsBatching
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java
deleted file mode 100644
index 907aed2..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util.crypto;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.security.util.EncryptionMethod;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.crypto.Cipher;
-import javax.crypto.NoSuchPaddingException;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.IvParameterSpec;
-import java.io.UnsupportedEncodingException;
-import java.security.InvalidAlgorithmParameterException;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.SecureRandom;
-import java.security.spec.InvalidKeySpecException;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * This is a standard implementation of {@link KeyedCipherProvider} which supports {@code AES} cipher families with arbitrary modes of operation (currently only {@code CBC}, {@code CTR}, and {@code
- * GCM} are supported as {@link EncryptionMethod}s.
- */
-public class AESKeyedCipherProvider extends KeyedCipherProvider {
- private static final Logger logger = LoggerFactory.getLogger(AESKeyedCipherProvider.class);
- private static final int IV_LENGTH = 16;
- private static final List<Integer> VALID_KEY_LENGTHS = Arrays.asList(128, 192, 256);
-
- /**
- * Returns an initialized cipher for the specified algorithm. The IV is provided externally to allow for non-deterministic IVs, as IVs
- * deterministically derived from the password are a potential vulnerability and compromise semantic security. See
- * <a href="http://crypto.stackexchange.com/a/3970/12569">Ilmari Karonen's answer on Crypto Stack Exchange</a>
- *
- * @param encryptionMethod the {@link EncryptionMethod}
- * @param key the key
- * @param iv the IV or nonce (cannot be all 0x00)
- * @param encryptMode true for encrypt, false for decrypt
- * @return the initialized cipher
- * @throws Exception if there is a problem initializing the cipher
- */
- @Override
- public Cipher getCipher(EncryptionMethod encryptionMethod, SecretKey key, byte[] iv, boolean encryptMode) throws Exception {
- try {
- return getInitializedCipher(encryptionMethod, key, iv, encryptMode);
- } catch (IllegalArgumentException e) {
- throw e;
- } catch (Exception e) {
- throw new ProcessException("Error initializing the cipher", e);
- }
- }
-
- /**
- * Returns an initialized cipher for the specified algorithm. The IV will be generated internally (for encryption). If decryption is requested, it will throw an exception.
- *
- * @param encryptionMethod the {@link EncryptionMethod}
- * @param key the key
- * @param encryptMode true for encrypt, false for decrypt
- * @return the initialized cipher
- * @throws Exception if there is a problem initializing the cipher or if decryption is requested
- */
- @Override
- public Cipher getCipher(EncryptionMethod encryptionMethod, SecretKey key, boolean encryptMode) throws Exception {
- return getCipher(encryptionMethod, key, new byte[0], encryptMode);
- }
-
- protected Cipher getInitializedCipher(EncryptionMethod encryptionMethod, SecretKey key, byte[] iv,
- boolean encryptMode) throws NoSuchAlgorithmException, NoSuchProviderException,
- InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, InvalidAlgorithmParameterException, UnsupportedEncodingException {
- if (encryptionMethod == null) {
- throw new IllegalArgumentException("The encryption method must be specified");
- }
-
- if (!encryptionMethod.isKeyedCipher()) {
- throw new IllegalArgumentException(encryptionMethod.name() + " requires a PBECipherProvider");
- }
-
- String algorithm = encryptionMethod.getAlgorithm();
- String provider = encryptionMethod.getProvider();
-
- if (key == null) {
- throw new IllegalArgumentException("The key must be specified");
- }
-
- if (!isValidKeyLength(key)) {
- throw new IllegalArgumentException("The key must be of length [" + StringUtils.join(VALID_KEY_LENGTHS, ", ") + "]");
- }
-
- Cipher cipher = Cipher.getInstance(algorithm, provider);
- final String operation = encryptMode ? "encrypt" : "decrypt";
-
- boolean ivIsInvalid = false;
-
- // If an IV was not provided already, generate a random IV and inject it in the cipher
- int ivLength = cipher.getBlockSize();
- if (iv.length != ivLength) {
- logger.warn("An IV was provided of length {} bytes for {}ion but should be {} bytes", iv.length, operation, ivLength);
- ivIsInvalid = true;
- }
-
- final byte[] emptyIv = new byte[ivLength];
- if (Arrays.equals(iv, emptyIv)) {
- logger.warn("An empty IV was provided of length {} for {}ion", iv.length, operation);
- ivIsInvalid = true;
- }
-
- if (ivIsInvalid) {
- if (encryptMode) {
- logger.warn("Generating new IV. The value can be obtained in the calling code by invoking 'cipher.getIV()';");
- iv = generateIV();
- } else {
- // Can't decrypt without an IV
- throw new IllegalArgumentException("Cannot decrypt without a valid IV");
- }
- }
- cipher.init(encryptMode ? Cipher.ENCRYPT_MODE : Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv));
-
- return cipher;
- }
-
- private boolean isValidKeyLength(SecretKey key) {
- return VALID_KEY_LENGTHS.contains(key.getEncoded().length * 8);
- }
-
- /**
- * Generates a new random IV of 16 bytes using {@link java.security.SecureRandom}.
- *
- * @return the IV
- */
- public byte[] generateIV() {
- byte[] iv = new byte[IV_LENGTH];
- new SecureRandom().nextBytes(iv);
- return iv;
- }
-}